Skip to content

Commit d4863b0

Browse files
committed
feat(backend): use retrypolicy.Do with independent retry, remove cascading cancellation
Signed-off-by: Zhao Chen <winters.zc@antgroup.com>
1 parent 7e6f63d commit d4863b0

9 files changed

Lines changed: 274 additions & 83 deletions

File tree

internal/pb/pb.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,28 @@ func (p *ProgressBar) Add(prompt, name string, size int64, reader io.Reader) io.
136136
return reader
137137
}
138138

139+
// Placeholder creates or resets a progress bar entry without a reader.
140+
// It is used during retry backoff to keep a visible bar for the item.
141+
func (p *ProgressBar) Placeholder(name string, prompt string, size int64) {
142+
if disableProgress {
143+
return
144+
}
145+
146+
p.mu.RLock()
147+
existing := p.bars[name]
148+
p.mu.RUnlock()
149+
150+
// If the bar already exists, just reset its message.
151+
if existing != nil {
152+
existing.msg = fmt.Sprintf("%s %s", prompt, name)
153+
existing.Bar.SetCurrent(0)
154+
return
155+
}
156+
157+
// Create a new placeholder bar.
158+
p.Add(prompt, name, size, nil)
159+
}
160+
139161
// Get returns the progress bar.
140162
func (p *ProgressBar) Get(name string) *progressBar {
141163
p.mu.RLock()

pkg/backend/build.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"os"
2424
"path/filepath"
2525

26-
retry "github.com/avast/retry-go/v4"
2726
modelspec "github.com/modelpack/model-spec/specs-go/v1"
2827
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2928
"github.com/sirupsen/logrus"
@@ -35,6 +34,7 @@ import (
3534
"github.com/modelpack/modctl/pkg/backend/processor"
3635
"github.com/modelpack/modctl/pkg/config"
3736
"github.com/modelpack/modctl/pkg/modelfile"
37+
"github.com/modelpack/modctl/pkg/retrypolicy"
3838
"github.com/modelpack/modctl/pkg/source"
3939
)
4040

@@ -123,8 +123,8 @@ func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target stri
123123

124124
var configDesc ocispec.Descriptor
125125
// Build the model config.
126-
if err := retry.Do(func() error {
127-
configDesc, err = builder.BuildConfig(ctx, config, hooks.NewHooks(
126+
if err := retrypolicy.Do(ctx, func(rctx context.Context) error {
127+
configDesc, err = builder.BuildConfig(rctx, config, hooks.NewHooks(
128128
hooks.WithOnStart(func(name string, size int64, reader io.Reader) io.Reader {
129129
return pb.Add(internalpb.NormalizePrompt("Building config"), name, size, reader)
130130
}),
@@ -136,13 +136,17 @@ func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target stri
136136
}),
137137
))
138138
return err
139-
}, append(defaultRetryOpts, retry.Context(ctx))...); err != nil {
139+
}, retrypolicy.DoOpts{
140+
FileSize: 0, // config is small
141+
FileName: "config",
142+
Config: &cfg.RetryConfig,
143+
}); err != nil {
140144
return fmt.Errorf("failed to build model config: %w", err)
141145
}
142146

143147
// Build the model manifest.
144-
if err := retry.Do(func() error {
145-
_, err = builder.BuildManifest(ctx, layers, configDesc, manifestAnnotation(modelfile), hooks.NewHooks(
148+
if err := retrypolicy.Do(ctx, func(rctx context.Context) error {
149+
_, err = builder.BuildManifest(rctx, layers, configDesc, manifestAnnotation(modelfile), hooks.NewHooks(
146150
hooks.WithOnStart(func(name string, size int64, reader io.Reader) io.Reader {
147151
return pb.Add(internalpb.NormalizePrompt("Building manifest"), name, size, reader)
148152
}),
@@ -154,7 +158,11 @@ func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target stri
154158
}),
155159
))
156160
return err
157-
}, append(defaultRetryOpts, retry.Context(ctx))...); err != nil {
161+
}, retrypolicy.DoOpts{
162+
FileSize: 0, // manifest is small
163+
FileName: "manifest",
164+
Config: &cfg.RetryConfig,
165+
}); err != nil {
158166
return fmt.Errorf("failed to build model manifest: %w", err)
159167
}
160168

@@ -204,7 +212,7 @@ func (b *backend) getProcessors(modelfile modelfile.Modelfile, cfg *config.Build
204212
func (b *backend) process(ctx context.Context, builder build.Builder, workDir string, pb *internalpb.ProgressBar, cfg *config.Build, processors ...processor.Processor) ([]ocispec.Descriptor, error) {
205213
descriptors := []ocispec.Descriptor{}
206214
for _, p := range processors {
207-
descs, err := p.Process(ctx, builder, workDir, processor.WithConcurrency(cfg.Concurrency), processor.WithProgressTracker(pb))
215+
descs, err := p.Process(ctx, builder, workDir, processor.WithConcurrency(cfg.Concurrency), processor.WithProgressTracker(pb), processor.WithRetryConfig(cfg.RetryConfig))
208216
if err != nil {
209217
return nil, err
210218
}

pkg/backend/fetch.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ package backend
1919
import (
2020
"context"
2121
"encoding/json"
22+
"errors"
2223
"fmt"
24+
"sync"
25+
"time"
2326

2427
"github.com/bmatcuk/doublestar/v4"
2528
legacymodelspec "github.com/dragonflyoss/model-spec/specs-go/v1"
@@ -31,6 +34,7 @@ import (
3134
internalpb "github.com/modelpack/modctl/internal/pb"
3235
"github.com/modelpack/modctl/pkg/backend/remote"
3336
"github.com/modelpack/modctl/pkg/config"
37+
"github.com/modelpack/modctl/pkg/retrypolicy"
3438
)
3539

3640
// Fetch fetches partial files to the output.
@@ -101,9 +105,12 @@ func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) e
101105
pb.Start()
102106
defer pb.Stop()
103107

104-
g, ctx := errgroup.WithContext(ctx)
108+
g := new(errgroup.Group)
105109
g.SetLimit(cfg.Concurrency)
106110

111+
var mu sync.Mutex
112+
var errs []error
113+
107114
logrus.Infof("fetch: fetching %d matched layers", len(layers))
108115
for _, layer := range layers {
109116
g.Go(func() error {
@@ -113,17 +120,42 @@ func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) e
113120
default:
114121
}
115122

116-
logrus.Debugf("fetch: processing layer %s", layer.Digest)
117-
if err := pullAndExtractFromRemote(ctx, pb, internalpb.NormalizePrompt("Fetching blob"), client, cfg.Output, layer); err != nil {
118-
return err
123+
var annoFilepath string
124+
if layer.Annotations != nil {
125+
if layer.Annotations[modelspec.AnnotationFilepath] != "" {
126+
annoFilepath = layer.Annotations[modelspec.AnnotationFilepath]
127+
} else {
128+
annoFilepath = layer.Annotations[legacymodelspec.AnnotationFilepath]
129+
}
119130
}
120131

121-
logrus.Debugf("fetch: successfully processed layer %s", layer.Digest)
132+
logrus.Debugf("fetch: processing layer %s", layer.Digest)
133+
if err := retrypolicy.Do(ctx, func(ctx context.Context) error {
134+
return pullAndExtractFromRemote(ctx, pb, internalpb.NormalizePrompt("Fetching blob"), client, cfg.Output, layer)
135+
}, retrypolicy.DoOpts{
136+
FileSize: layer.Size,
137+
FileName: annoFilepath,
138+
Config: &cfg.RetryConfig,
139+
OnRetry: func(attempt uint, reason string, backoff time.Duration) {
140+
if bar := pb.Get(layer.Digest.String()); bar != nil {
141+
bar.SetRefill(bar.Current())
142+
bar.SetCurrent(0)
143+
bar.EwmaSetCurrent(0, time.Second)
144+
}
145+
},
146+
}); err != nil {
147+
mu.Lock()
148+
errs = append(errs, err)
149+
mu.Unlock()
150+
} else {
151+
logrus.Debugf("fetch: successfully processed layer %s", layer.Digest)
152+
}
122153
return nil
123154
})
124155
}
125156

126-
if err := g.Wait(); err != nil {
157+
_ = g.Wait()
158+
if err := errors.Join(errs...); err != nil {
127159
return err
128160
}
129161

pkg/backend/fetch_by_d7y.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@ package backend
1919
import (
2020
"context"
2121
"encoding/json"
22+
"errors"
2223
"fmt"
2324
"io"
2425
"os"
2526
"path/filepath"
2627
"strings"
28+
"sync"
29+
"time"
2730

2831
common "d7y.io/api/v2/pkg/apis/common/v2"
2932
dfdaemon "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
30-
"github.com/avast/retry-go/v4"
3133
"github.com/bmatcuk/doublestar/v4"
3234
legacymodelspec "github.com/dragonflyoss/model-spec/specs-go/v1"
3335
modelspec "github.com/modelpack/model-spec/specs-go/v1"
@@ -41,6 +43,7 @@ import (
4143
"github.com/modelpack/modctl/pkg/archiver"
4244
"github.com/modelpack/modctl/pkg/backend/remote"
4345
"github.com/modelpack/modctl/pkg/config"
46+
"github.com/modelpack/modctl/pkg/retrypolicy"
4447
)
4548

4649
// fetchByDragonfly fetches partial files via Dragonfly gRPC service based on pattern matching.
@@ -124,9 +127,12 @@ func (b *backend) fetchByDragonfly(ctx context.Context, target string, cfg *conf
124127
defer pb.Stop()
125128

126129
// Process layers concurrently.
127-
g, ctx := errgroup.WithContext(ctx)
130+
g := new(errgroup.Group)
128131
g.SetLimit(cfg.Concurrency)
129132

133+
var mu sync.Mutex
134+
var errs []error
135+
130136
logrus.Infof("fetch: fetching %d matched layers via dragonfly", len(layers))
131137
for _, layer := range layers {
132138
g.Go(func() error {
@@ -138,14 +144,18 @@ func (b *backend) fetchByDragonfly(ctx context.Context, target string, cfg *conf
138144

139145
logrus.Debugf("fetch: processing layer %s via dragonfly", layer.Digest)
140146
if err := fetchLayerByDragonfly(ctx, pb, dfdaemon.NewDfdaemonDownloadClient(conn), ref, manifest, layer, authToken, cfg); err != nil {
141-
return err
147+
mu.Lock()
148+
errs = append(errs, err)
149+
mu.Unlock()
150+
} else {
151+
logrus.Debugf("fetch: successfully processed layer %s via dragonfly", layer.Digest)
142152
}
143-
logrus.Debugf("fetch: successfully processed layer %s via dragonfly", layer.Digest)
144153
return nil
145154
})
146155
}
147156

148-
if err := g.Wait(); err != nil {
157+
_ = g.Wait()
158+
if err := errors.Join(errs...); err != nil {
149159
return err
150160
}
151161

@@ -155,7 +165,16 @@ func (b *backend) fetchByDragonfly(ctx context.Context, target string, cfg *conf
155165

156166
// fetchLayerByDragonfly handles downloading and extracting a single layer via Dragonfly.
157167
func fetchLayerByDragonfly(ctx context.Context, pb *internalpb.ProgressBar, client dfdaemon.DfdaemonDownloadClient, ref Referencer, manifest ocispec.Manifest, desc ocispec.Descriptor, authToken string, cfg *config.Fetch) error {
158-
err := retry.Do(func() error {
168+
var annoFilepath string
169+
if desc.Annotations != nil {
170+
if desc.Annotations[modelspec.AnnotationFilepath] != "" {
171+
annoFilepath = desc.Annotations[modelspec.AnnotationFilepath]
172+
} else {
173+
annoFilepath = desc.Annotations[legacymodelspec.AnnotationFilepath]
174+
}
175+
}
176+
177+
err := retrypolicy.Do(ctx, func(ctx context.Context) error {
159178
logrus.Debugf("fetch: processing layer %s", desc.Digest)
160179
cfg.Hooks.BeforePullLayer(desc, manifest) // Call before hook
161180
err := downloadAndExtractFetchLayer(ctx, pb, client, ref, desc, authToken, cfg)
@@ -166,7 +185,18 @@ func fetchLayerByDragonfly(ctx context.Context, pb *internalpb.ProgressBar, clie
166185
}
167186

168187
return err
169-
}, append(defaultRetryOpts, retry.Context(ctx))...)
188+
}, retrypolicy.DoOpts{
189+
FileSize: desc.Size,
190+
FileName: annoFilepath,
191+
Config: &cfg.RetryConfig,
192+
OnRetry: func(attempt uint, reason string, backoff time.Duration) {
193+
if bar := pb.Get(desc.Digest.String()); bar != nil {
194+
bar.SetRefill(bar.Current())
195+
bar.SetCurrent(0)
196+
bar.EwmaSetCurrent(0, time.Second)
197+
}
198+
},
199+
})
170200

171201
if err != nil {
172202
err = fmt.Errorf("fetch: failed to download and extract layer %s: %w", desc.Digest, err)

pkg/backend/processor/base.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package processor
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"io"
2324
"os"
@@ -26,7 +27,6 @@ import (
2627
"strings"
2728
"sync"
2829

29-
"github.com/avast/retry-go/v4"
3030
legacymodelspec "github.com/dragonflyoss/model-spec/specs-go/v1"
3131
modelspec "github.com/modelpack/model-spec/specs-go/v1"
3232
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@@ -36,6 +36,7 @@ import (
3636
internalpb "github.com/modelpack/modctl/internal/pb"
3737
"github.com/modelpack/modctl/pkg/backend/build"
3838
"github.com/modelpack/modctl/pkg/backend/build/hooks"
39+
"github.com/modelpack/modctl/pkg/retrypolicy"
3940
"github.com/modelpack/modctl/pkg/storage"
4041
)
4142

@@ -105,14 +106,11 @@ func (b *base) Process(ctx context.Context, builder build.Builder, workDir strin
105106

106107
var (
107108
mu sync.Mutex
108-
eg *errgroup.Group
109109
descriptors []ocispec.Descriptor
110+
errs []error
110111
)
111112

112-
// Initialize errgroup with a context can be canceled.
113-
ctx, cancel := context.WithCancel(ctx)
114-
defer cancel()
115-
eg, ctx = errgroup.WithContext(ctx)
113+
eg := new(errgroup.Group)
116114

117115
// Set default concurrency limit to 1 if not specified.
118116
if processOpts.concurrency > 0 {
@@ -137,19 +135,25 @@ func (b *base) Process(ctx context.Context, builder build.Builder, workDir strin
137135
eg.Go(func() error {
138136
select {
139137
case <-ctx.Done():
140-
return ctx.Err()
138+
return nil
141139
default:
142140
}
143141

144-
if err := retry.Do(func() error {
142+
// Get file size for dynamic retry parameters.
143+
var fileSize int64
144+
if fi, err := os.Stat(path); err == nil {
145+
fileSize = fi.Size()
146+
}
147+
148+
if err := retrypolicy.Do(ctx, func(rctx context.Context) error {
145149
logrus.Debugf("processor: processing %s file %s", b.name, path)
146150

147151
var destPath string
148152
if b.destDir != "" {
149153
destPath = filepath.Join(b.destDir, filepath.Base(path))
150154
}
151155

152-
desc, err := builder.BuildLayer(ctx, b.mediaType, workDir, path, destPath, hooks.NewHooks(
156+
desc, err := builder.BuildLayer(rctx, b.mediaType, workDir, path, destPath, hooks.NewHooks(
153157
hooks.WithOnStart(func(name string, size int64, reader io.Reader) io.Reader {
154158
return tracker.Add(internalpb.NormalizePrompt("Building layer"), name, size, reader)
155159
}),
@@ -170,19 +174,24 @@ func (b *base) Process(ctx context.Context, builder build.Builder, workDir strin
170174
mu.Unlock()
171175

172176
return nil
173-
}, append(defaultRetryOpts, retry.Context(ctx))...); err != nil {
177+
}, retrypolicy.DoOpts{
178+
FileSize: fileSize,
179+
FileName: filepath.Base(path),
180+
Config: processOpts.retryConfig,
181+
}); err != nil {
174182
logrus.Error(err)
175-
// Cancel manually to abort other tasks because if one fails,
176-
// we should abort all to avoid useless waiting.
177-
cancel()
178-
return err
183+
mu.Lock()
184+
errs = append(errs, err)
185+
mu.Unlock()
179186
}
180187

181188
return nil
182189
})
183190
}
184191

185-
if err := eg.Wait(); err != nil {
192+
_ = eg.Wait()
193+
194+
if err := errors.Join(errs...); err != nil {
186195
return nil, err
187196
}
188197

0 commit comments

Comments
 (0)