diff --git a/cmd/modelfile/modelfile.go b/cmd/modelfile/modelfile.go index 23f311df..e89563f1 100644 --- a/cmd/modelfile/modelfile.go +++ b/cmd/modelfile/modelfile.go @@ -17,8 +17,6 @@ package modelfile import ( - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -32,8 +30,6 @@ var RootCmd = &cobra.Command{ SilenceUsage: true, FParseErrWhitelist: cobra.FParseErrWhitelist{UnknownFlags: true}, RunE: func(cmd *cobra.Command, args []string) error { - logrus.Debug("modctl modelfile is running") - return nil }, } diff --git a/cmd/root.go b/cmd/root.go index bd1aa521..2b3cf1ca 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -142,5 +142,6 @@ func init() { rootCmd.AddCommand(tagCmd) rootCmd.AddCommand(fetchCmd) rootCmd.AddCommand(attachCmd) + rootCmd.AddCommand(uploadCmd) rootCmd.AddCommand(modelfile.RootCmd) } diff --git a/cmd/upload.go b/cmd/upload.go new file mode 100644 index 00000000..18c73ee4 --- /dev/null +++ b/cmd/upload.go @@ -0,0 +1,75 @@ +/* + * Copyright 2025 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "context" + "fmt" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "github.com/CloudNativeAI/modctl/pkg/backend" + "github.com/CloudNativeAI/modctl/pkg/config" +) + +var uploadConfig = config.NewUpload() + +// uploadCmd represents the modctl command for upload. +var uploadCmd = &cobra.Command{ + Use: "upload [flags] ", + Short: "A command line tool for modctl upload", + Args: cobra.ExactArgs(1), + DisableAutoGenTag: true, + SilenceUsage: true, + FParseErrWhitelist: cobra.FParseErrWhitelist{UnknownFlags: true}, + RunE: func(cmd *cobra.Command, args []string) error { + if err := uploadConfig.Validate(); err != nil { + return err + } + + return runUpload(context.Background(), args[0]) + }, +} + +// init initializes upload command. +func init() { + flags := uploadCmd.Flags() + flags.StringVarP(&uploadConfig.Repo, "repo", "", "", "target model artifact repository name") + flags.BoolVarP(&uploadConfig.PlainHTTP, "plain-http", "", false, "turning on this flag will use plain HTTP instead of HTTPS") + flags.BoolVarP(&uploadConfig.Insecure, "insecure", "", false, "turning on this flag will disable TLS verification") + flags.BoolVar(&uploadConfig.Raw, "raw", false, "turning on this flag will upload model artifact layer in raw format") + + if err := viper.BindPFlags(flags); err != nil { + panic(fmt.Errorf("bind cache list flags to viper: %w", err)) + } +} + +// runUpload runs the upload modctl. +func runUpload(ctx context.Context, filepath string) error { + b, err := backend.New(rootConfig.StoargeDir) + if err != nil { + return err + } + + if err := b.Upload(ctx, filepath, uploadConfig); err != nil { + return err + } + + fmt.Printf("Successfully uploaded %s to model artifact repository: %s\n", filepath, uploadConfig.Repo) + return nil +} diff --git a/go.mod b/go.mod index f3bf31ae..fb7cfdbc 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/vbauerster/mpb/v8 v8.10.2 golang.org/x/crypto v0.39.0 golang.org/x/sync v0.15.0 + golang.org/x/sys v0.33.0 google.golang.org/grpc v1.73.0 oras.land/oras-go/v2 v2.6.0 ) @@ -111,7 +112,6 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.41.0 // indirect - golang.org/x/sys v0.33.0 // indirect golang.org/x/term v0.32.0 // indirect golang.org/x/text v0.26.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463 // indirect diff --git a/pkg/backend/attach.go b/pkg/backend/attach.go index 901fd53a..4e5c3ceb 100644 --- a/pkg/backend/attach.go +++ b/pkg/backend/attach.go @@ -60,7 +60,7 @@ var ( // Attach attaches user materials into the model artifact which follows the Model Spec. func (b *backend) Attach(ctx context.Context, filepath string, cfg *config.Attach) error { - logrus.Infof("attaching file %s, cfg: %+v", filepath, cfg) + logrus.Infof("attach: starting attach operation for file %s [config: %+v]", filepath, cfg) srcManifest, err := b.getManifest(ctx, cfg.Source, cfg.OutputRemote, cfg.PlainHTTP, cfg.Insecure) if err != nil { return fmt.Errorf("failed to get source manifest: %w", err) @@ -71,7 +71,7 @@ func (b *backend) Attach(ctx context.Context, filepath string, cfg *config.Attac return fmt.Errorf("failed to get source model config: %w", err) } - logrus.Infof("source model config: %+v", srcModelConfig) + logrus.Infof("attach: loaded source model config [%+v]", srcModelConfig) var foundLayer *ocispec.Descriptor for _, layer := range srcManifest.Layers { @@ -87,7 +87,7 @@ func (b *backend) Attach(ctx context.Context, filepath string, cfg *config.Attac } } - logrus.Infof("found original layer: %+v", foundLayer) + logrus.Infof("attach: found existing layer for file %s [%+v]", filepath, foundLayer) layers := srcManifest.Layers if foundLayer != nil { @@ -100,7 +100,7 @@ func (b *backend) Attach(ctx context.Context, filepath string, cfg *config.Attac } } - proc := b.getProcessor(filepath, cfg) + proc := b.getProcessor(filepath, cfg.Raw) if proc == nil { return fmt.Errorf("failed to get processor for file %s", filepath) } @@ -123,7 +123,7 @@ func (b *backend) Attach(ctx context.Context, filepath string, cfg *config.Attac layers = append(layers, newLayers...) sortLayers(layers) - logrus.Infof("new sorted layers: %+v", layers) + logrus.Debugf("attach: generated sorted layers [layers: %+v]", layers) diffIDs := []godigest.Digest{} for _, layer := range layers { @@ -145,7 +145,7 @@ func (b *backend) Attach(ctx context.Context, filepath string, cfg *config.Attac Name: srcModelConfig.Descriptor.Name, } - logrus.Infof("new model config: %+v", modelConfig) + logrus.Infof("attach: built model config [%+v]", modelConfig) configDesc, err := builder.BuildConfig(ctx, layers, modelConfig, hooks.NewHooks( hooks.WithOnStart(func(name string, size int64, reader io.Reader) io.Reader { @@ -178,6 +178,7 @@ func (b *backend) Attach(ctx context.Context, filepath string, cfg *config.Attac return fmt.Errorf("failed to build model manifest: %w", err) } + logrus.Infof("attach: successfully attached file %s", filepath) return nil } @@ -272,10 +273,10 @@ func (b *backend) getModelConfig(ctx context.Context, reference string, desc oci return &model, nil } -func (b *backend) getProcessor(filepath string, cfg *config.Attach) processor.Processor { +func (b *backend) getProcessor(filepath string, rawMediaType bool) processor.Processor { if modelfile.IsFileType(filepath, modelfile.ConfigFilePatterns) { mediaType := modelspec.MediaTypeModelWeightConfig - if cfg.Raw { + if rawMediaType { mediaType = modelspec.MediaTypeModelWeightConfigRaw } return processor.NewModelConfigProcessor(b.store, mediaType, []string{filepath}) @@ -283,7 +284,7 @@ func (b *backend) getProcessor(filepath string, cfg *config.Attach) processor.Pr if modelfile.IsFileType(filepath, modelfile.ModelFilePatterns) { mediaType := modelspec.MediaTypeModelWeight - if cfg.Raw { + if rawMediaType { mediaType = modelspec.MediaTypeModelWeightRaw } return processor.NewModelProcessor(b.store, mediaType, []string{filepath}) @@ -291,7 +292,7 @@ func (b *backend) getProcessor(filepath string, cfg *config.Attach) processor.Pr if modelfile.IsFileType(filepath, modelfile.CodeFilePatterns) { mediaType := modelspec.MediaTypeModelCode - if cfg.Raw { + if rawMediaType { mediaType = modelspec.MediaTypeModelCodeRaw } return processor.NewCodeProcessor(b.store, mediaType, []string{filepath}) @@ -299,7 +300,7 @@ func (b *backend) getProcessor(filepath string, cfg *config.Attach) processor.Pr if modelfile.IsFileType(filepath, modelfile.DocFilePatterns) { mediaType := modelspec.MediaTypeModelDoc - if cfg.Raw { + if rawMediaType { mediaType = modelspec.MediaTypeModelDocRaw } return processor.NewDocProcessor(b.store, mediaType, []string{filepath}) diff --git a/pkg/backend/attach_test.go b/pkg/backend/attach_test.go index ff35215c..bb527fa5 100644 --- a/pkg/backend/attach_test.go +++ b/pkg/backend/attach_test.go @@ -73,7 +73,7 @@ func TestGetProcessor(t *testing.T) { for _, tt := range tests { t.Run(tt.filepath, func(t *testing.T) { - proc := b.getProcessor(tt.filepath, &config.Attach{}) + proc := b.getProcessor(tt.filepath, false) if tt.wantType == "" { assert.Nil(t, proc) } else { diff --git a/pkg/backend/backend.go b/pkg/backend/backend.go index 154db9d6..0f7d727f 100644 --- a/pkg/backend/backend.go +++ b/pkg/backend/backend.go @@ -34,6 +34,9 @@ type Backend interface { // Attach attaches user materials into the model artifact which follows the Model Spec. Attach(ctx context.Context, filepath string, cfg *config.Attach) error + // Upload uploads the file to a model artifact repository in advance, but will not push config and manifest. + Upload(ctx context.Context, filepath string, cfg *config.Upload) error + // Build builds the user materials into the model artifact which follows the Model Spec. Build(ctx context.Context, modelfilePath, workDir, target string, cfg *config.Build) error diff --git a/pkg/backend/build.go b/pkg/backend/build.go index 6e3fd6e0..03ee620c 100644 --- a/pkg/backend/build.go +++ b/pkg/backend/build.go @@ -46,7 +46,7 @@ const ( // Build builds the user materials into the model artifact which follows the Model Spec. func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target string, cfg *config.Build) error { - logrus.Infof("building model artifact: %s, cfg: %+v", target, cfg) + logrus.Infof("build: starting build operation for target %s [config: %+v]", target, cfg) // parse the repo name and tag name from target. ref, err := ParseReference(target) if err != nil { @@ -99,7 +99,7 @@ func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target stri layers = append(layers, layerDescs...) - logrus.Infof("model artifact layers: %+v", layers) + logrus.Infof("build: processed layers for artifact [count: %d, layers: %+v]", len(layers), layers) revision := sourceInfo.Commit if revision != "" && sourceInfo.Dirty { @@ -118,7 +118,7 @@ func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target stri SourceRevision: revision, } - logrus.Infof("model artifact config: %+v", modelConfig) + logrus.Infof("build: built model config [family: %s, name: %s, format: %s]", modelConfig.Family, modelConfig.Name, modelConfig.Format) var configDesc ocispec.Descriptor // Build the model config. @@ -157,6 +157,7 @@ func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target stri return fmt.Errorf("failed to build model manifest: %w", err) } + logrus.Infof("build: successfully built model artifact %s", target) return nil } diff --git a/pkg/backend/build/builder.go b/pkg/backend/build/builder.go index de2b6c25..2a0fcb00 100644 --- a/pkg/backend/build/builder.go +++ b/pkg/backend/build/builder.go @@ -25,6 +25,7 @@ import ( "io" "os" "path/filepath" + "strconv" "sync" "syscall" "time" @@ -35,11 +36,12 @@ import ( spec "github.com/opencontainers/image-spec/specs-go" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" buildconfig "github.com/CloudNativeAI/modctl/pkg/backend/build/config" "github.com/CloudNativeAI/modctl/pkg/backend/build/hooks" "github.com/CloudNativeAI/modctl/pkg/backend/build/interceptor" - "github.com/CloudNativeAI/modctl/pkg/codec" + pkgcodec "github.com/CloudNativeAI/modctl/pkg/codec" "github.com/CloudNativeAI/modctl/pkg/storage" ) @@ -142,12 +144,12 @@ func (ab *abstractBuilder) BuildLayer(ctx context.Context, mediaType, workDir, p return ocispec.Descriptor{}, fmt.Errorf("failed to get relative path: %w", err) } - codec, err := codec.New(codec.TypeFromMediaType(mediaType)) + codec, err := pkgcodec.New(pkgcodec.TypeFromMediaType(mediaType)) if err != nil { return ocispec.Descriptor{}, fmt.Errorf("failed to create codec: %w", err) } - logrus.Infof("building file %s...", relPath) + logrus.Debugf("builder: starting build layer for file %s", relPath) // Encode the content by codec depends on the media type. reader, err := codec.Encode(path, workDirPath) @@ -155,30 +157,9 @@ func (ab *abstractBuilder) BuildLayer(ctx context.Context, mediaType, workDir, p return ocispec.Descriptor{}, fmt.Errorf("failed to encode file: %w", err) } - logrus.Infof("calculating digest for %s...", relPath) - // Calculate the digest of the encoded content. - hash := sha256.New() - size, err := io.Copy(hash, reader) + reader, digest, size, err := computeDigestAndSize(mediaType, path, workDirPath, info, reader, codec) if err != nil { - return ocispec.Descriptor{}, fmt.Errorf("failed to copy content to hash: %w", err) - } - - digest := fmt.Sprintf("sha256:%x", hash.Sum(nil)) - logrus.Infof("calculated digest for %s: %s", relPath, digest) - - // Seek the reader to the beginning if supported, - // otherwise we needs to re-encode the content again. - if seeker, ok := reader.(io.ReadSeeker); ok { - logrus.Infof("seeking %s reader to beginning...", relPath) - if _, err := seeker.Seek(0, io.SeekStart); err != nil { - return ocispec.Descriptor{}, fmt.Errorf("failed to seek reader: %w", err) - } - } else { - logrus.Infof("%s reader is not seekable, re-encoding...", relPath) - reader, err = codec.Encode(path, workDirPath) - if err != nil { - return ocispec.Descriptor{}, fmt.Errorf("failed to encode file: %w", err) - } + return ocispec.Descriptor{}, fmt.Errorf("failed to compute digest and size: %w", err) } var ( @@ -213,24 +194,10 @@ func (ab *abstractBuilder) BuildLayer(ctx context.Context, mediaType, workDir, p applyDesc(&desc) } - // Retrieve the file metadata. - metadata, err := getFileMetadata(path) - if err != nil { - return desc, fmt.Errorf("failed to retrieve file metadata: %w", err) - } - - metadataStr, err := json.Marshal(metadata) - if err != nil { - return desc, fmt.Errorf("failed to marshal metadata: %w", err) - } - - logrus.Infof("retrieved file %s metadata: %s", relPath, string(metadataStr)) - - // Apply the metadata to the descriptor annotation. - if desc.Annotations == nil { - desc.Annotations = make(map[string]string) + // Add file metadata to descriptor. + if err := addFileMetadata(&desc, path, relPath); err != nil { + return desc, err } - desc.Annotations[modelspec.AnnotationFileMetadata] = string(metadataStr) return desc, nil } @@ -315,6 +282,109 @@ func buildModelConfig(modelConfig *buildconfig.Model, layers []ocispec.Descripto }, nil } +// computeDigestAndSize computes the digest and size for the encoded content, using xattrs if available. +func computeDigestAndSize(mediaType, path, workDirPath string, info os.FileInfo, reader io.Reader, codec pkgcodec.Codec) (io.Reader, string, int64, error) { + var digest string + var size int64 + + if pkgcodec.IsRawMediaType(mediaType) { + // By default let's assume the mtime and size has changed. + mtimeChanged := true + sizeChanged := true + + if mtime, err := getXattr(path, xattrMtimeKey(mediaType)); err == nil { + if string(mtime) == fmt.Sprintf("%d", info.ModTime().UnixNano()) { + mtimeChanged = false + } + } + + if sizeBytes, err := getXattr(path, xattrSizeKey(mediaType)); err == nil { + if parsedSize, err := strconv.ParseInt(string(sizeBytes), 10, 64); err == nil { + if parsedSize == info.Size() { + sizeChanged = false + } + } + } + + if !mtimeChanged && !sizeChanged { + // Check xattrs for cached digest and size. + if sha256, err := getXattr(path, xattrSha256Key(mediaType)); err == nil { + digest = string(sha256) + logrus.Infof("builder: retrieved sha256 hash from xattr for file %s [digest: %s]", path, digest) + } + + if sizeBytes, err := getXattr(path, xattrSizeKey(mediaType)); err == nil { + if parsedSize, err := strconv.ParseInt(string(sizeBytes), 10, 64); err == nil { + size = parsedSize + logrus.Infof("builder: retrieved size from xattr for file %s [size: %d]", path, size) + } + } + } + } + + // Compute digest and size if not retrieved from xattrs. + if digest == "" { + logrus.Infof("builder: calculating digest for file %s", path) + var err error + hash := sha256.New() + size, err = io.Copy(hash, reader) + if err != nil { + return reader, "", 0, fmt.Errorf("failed to copy content to hash: %w", err) + } + digest = fmt.Sprintf("sha256:%x", hash.Sum(nil)) + logrus.Infof("builder: calculated digest for file %s [digest: %s]", path, digest) + + // Reset reader + reader, err = resetReader(reader, path, workDirPath, codec) + if err != nil { + return reader, "", 0, err + } + + // Store xattrs if raw media type. + if pkgcodec.IsRawMediaType(mediaType) { + setXattr(path, xattrMtimeKey(mediaType), fmt.Appendf([]byte{}, "%d", info.ModTime().UnixNano())) + setXattr(path, xattrSha256Key(mediaType), []byte(digest)) + setXattr(path, xattrSizeKey(mediaType), fmt.Appendf([]byte{}, "%d", size)) + } + } + + return reader, digest, size, nil +} + +// resetReader resets the reader to the beginning or re-encodes if not seekable. +func resetReader(reader io.Reader, path, workDirPath string, codec pkgcodec.Codec) (io.Reader, error) { + if seeker, ok := reader.(io.ReadSeeker); ok { + logrus.Debugf("builder: seeking reader to beginning for file %s", path) + if _, err := seeker.Seek(0, io.SeekStart); err != nil { + return nil, fmt.Errorf("failed to seek reader: %w", err) + } + return reader, nil + } + + logrus.Debugf("builder: reader not seekable, re-encoding file %s", path) + return codec.Encode(path, workDirPath) +} + +// addFileMetadata adds file metadata to the descriptor. +func addFileMetadata(desc *ocispec.Descriptor, path, relPath string) error { + metadata, err := getFileMetadata(path) + if err != nil { + return fmt.Errorf("failed to retrieve file metadata: %w", err) + } + + metadataStr, err := json.Marshal(metadata) + if err != nil { + return fmt.Errorf("failed to marshal metadata: %w", err) + } + logrus.Infof("builder: retrieved metadata for file %s [metadata: %s]", relPath, string(metadataStr)) + + if desc.Annotations == nil { + desc.Annotations = make(map[string]string) + } + desc.Annotations[modelspec.AnnotationFileMetadata] = string(metadataStr) + return nil +} + // splitReader splits the original reader into two readers. func splitReader(original io.Reader) (io.Reader, io.Reader) { r1, w1 := io.Pipe() @@ -368,3 +438,52 @@ func getFileMetadata(path string) (modelspec.FileMetadata, error) { return metadata, nil } + +func xattrSha256Key(mediaType string) string { + // Uniformity between linux and mac platforms is simplified by adding the prefix 'user.', + // because the key may be unlimited under mac, + // but on linux, in some cases, the user can only manipulate the user space. + return fmt.Sprintf("user.%s.sha256", mediaType) +} + +func xattrSizeKey(mediaType string) string { + // Uniformity between linux and mac platforms is simplified by adding the prefix 'user.', + // because the key may be unlimited under mac, + // but on linux, in some cases, the user can only manipulate the user space. + return fmt.Sprintf("user.%s.size", mediaType) +} + +func xattrMtimeKey(mediaType string) string { + // Uniformity between linux and mac platforms is simplified by adding the prefix 'user.', + // because the key may be unlimited under mac, + // but on linux, in some cases, the user can only manipulate the user space. + return fmt.Sprintf("user.%s.mtime", mediaType) +} + +// getXattr retrieves an xattr value for a given key. +func getXattr(path, key string) ([]byte, error) { + var value []byte + sz, err := unix.Getxattr(path, key, value) + if err != nil { + logrus.Warnf("builder: failed to get xattr %s for file %s: %v", key, path, err) + return nil, err + } + + value = make([]byte, sz) + _, err = unix.Getxattr(path, key, value) + if err != nil { + logrus.Warnf("builder: failed to get xattr %s for file %s: %v", key, path, err) + return nil, err + } + + return value, nil +} + +// setXattr sets an xattr value for a given key. +func setXattr(path, key string, value []byte) { + if err := unix.Setxattr(path, key, value, 0); err != nil { + logrus.Warnf("builder: failed to set xattr %s for file %s: %v", key, path, err) + } else { + logrus.Infof("builder: set xattr %s for file %s: %s", key, path, string(value)) + } +} diff --git a/pkg/backend/build/remote.go b/pkg/backend/build/remote.go index 9b668dd6..5a65af8b 100644 --- a/pkg/backend/build/remote.go +++ b/pkg/backend/build/remote.go @@ -70,7 +70,10 @@ func (ro *remoteOutput) OutputLayer(ctx context.Context, mediaType, relPath, dig if exist { // In case the reader is from PipeReader, we need to read the whole reader to avoid the pipe being blocked. - io.Copy(io.Discard, reader) + if _, ok := reader.(*io.PipeReader); ok { + io.Copy(io.Discard, reader) + } + hooks.OnComplete(relPath, desc) return desc, nil } diff --git a/pkg/backend/extract.go b/pkg/backend/extract.go index 1cdc3a58..aed6d6f9 100644 --- a/pkg/backend/extract.go +++ b/pkg/backend/extract.go @@ -40,7 +40,7 @@ const ( // Extract extracts the model artifact. func (b *backend) Extract(ctx context.Context, target string, cfg *config.Extract) error { - logrus.Infof("extracting model artifact: %s, cfg: %+v", target, cfg) + logrus.Infof("extract: starting extract operation for target %s [config: %+v]", target, cfg) // parse the repository and tag from the target. ref, err := ParseReference(target) if err != nil { @@ -59,7 +59,7 @@ func (b *backend) Extract(ctx context.Context, target string, cfg *config.Extrac return fmt.Errorf("failed to unmarshal the manifest: %w", err) } - logrus.Infof("manifest: %s", string(manifestRaw)) + logrus.Debugf("extract: loaded manifest for target %s [manifest: %s]", target, string(manifestRaw)) return exportModelArtifact(ctx, b.store, manifest, repo, cfg) } @@ -69,7 +69,7 @@ func exportModelArtifact(ctx context.Context, store storage.Storage, manifest oc g, ctx := errgroup.WithContext(ctx) g.SetLimit(cfg.Concurrency) - logrus.Infof("extracting %d layers in total...", len(manifest.Layers)) + logrus.Infof("extract: processing layers for target %s [count: %d]", repo, len(manifest.Layers)) for _, layer := range manifest.Layers { g.Go(func() error { select { @@ -78,7 +78,7 @@ func exportModelArtifact(ctx context.Context, store storage.Storage, manifest oc default: } - logrus.Infof("extracting layer %s...", layer.Digest.String()) + logrus.Debugf("extract: processing layer %s", layer.Digest.String()) // pull the blob from the storage. reader, err := store.PullBlob(ctx, repo, layer.Digest.String()) if err != nil { @@ -91,13 +91,18 @@ func exportModelArtifact(ctx context.Context, store storage.Storage, manifest oc return fmt.Errorf("failed to extract layer %s: %w", layer.Digest.String(), err) } - logrus.Infof("extracted layer %s successfully", layer.Digest.String()) + logrus.Debugf("extract: successfully processed layer %s", layer.Digest.String()) return nil }) } - return g.Wait() + if err := g.Wait(); err != nil { + return err + } + + logrus.Infof("extract: successfully extracted model artifact %s", repo) + return nil } // extractLayer extracts the layer to the output directory. diff --git a/pkg/backend/fetch.go b/pkg/backend/fetch.go index 9b752d9a..803f2a74 100644 --- a/pkg/backend/fetch.go +++ b/pkg/backend/fetch.go @@ -34,7 +34,7 @@ import ( // Fetch fetches partial files to the output. func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) error { - logrus.Infof("fetching partial files, target: %s, cfg: %+v", target, cfg) + logrus.Infof("fetch: starting fetch operation for target %s [config: %+v]", target, cfg) // parse the repository and tag from the target. ref, err := ParseReference(target) if err != nil { @@ -59,7 +59,7 @@ func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) e return fmt.Errorf("failed to decode the manifest: %w", err) } - logrus.Infof("manifest: %+v", manifest) + logrus.Debugf("fetch: loaded manifest for target %s [manifest: %+v]", target, manifest) layers := []ocispec.Descriptor{} // filter the layers by patterns. @@ -89,7 +89,7 @@ func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) e g, ctx := errgroup.WithContext(ctx) g.SetLimit(cfg.Concurrency) - logrus.Infof("fetching %d layers in total...", len(layers)) + logrus.Infof("fetch: processing matched layers [count: %d]", len(layers)) for _, layer := range layers { g.Go(func() error { select { @@ -98,16 +98,20 @@ func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) e default: } - logrus.Infof("fetching layer %s...", layer.Digest) + logrus.Debugf("fetch: processing layer %s", layer.Digest) if err := pullAndExtractFromRemote(ctx, pb, internalpb.NormalizePrompt("Fetching blob"), client, cfg.Output, layer); err != nil { return err } - logrus.Infof("layer %s fetched successfully", layer.Digest) + logrus.Debugf("fetch: successfully processed layer %s", layer.Digest) return nil }) } - logrus.Infof("fetched %d layers in total successfully", len(layers)) - return g.Wait() + if err := g.Wait(); err != nil { + return err + } + + logrus.Infof("fetch: successfully fetched layers [count: %d]", len(layers)) + return nil } diff --git a/pkg/backend/inspect.go b/pkg/backend/inspect.go index 5c529603..5e7b3c0d 100644 --- a/pkg/backend/inspect.go +++ b/pkg/backend/inspect.go @@ -67,7 +67,7 @@ type InspectedModelArtifactLayer struct { // Inspect inspects the target from the storage. func (b *backend) Inspect(ctx context.Context, target string, cfg *config.Inspect) (*InspectedModelArtifact, error) { - logrus.Infof("inspecting target: %s, cfg: %+v", target, cfg) + logrus.Infof("inspect: starting inspect operation for target %s [config: %+v]", target, cfg) _, err := ParseReference(target) if err != nil { return nil, fmt.Errorf("failed to parse target: %w", err) @@ -83,14 +83,14 @@ func (b *backend) Inspect(ctx context.Context, target string, cfg *config.Inspec return nil, fmt.Errorf("failed to marshal manifest: %w", err) } - logrus.Infof("manifest: %s", string(manifestRaw)) + logrus.Debugf("inspect: loaded manifest for target %s [manifest: %s]", target, string(manifestRaw)) config, err := b.getModelConfig(ctx, target, manifest.Config, cfg.Remote, cfg.PlainHTTP, cfg.Insecure) if err != nil { return nil, fmt.Errorf("failed to get config: %w", err) } - logrus.Infof("model config: %+v", config) + logrus.Debugf("inspect: loaded model config for target %s [family: %s, name: %s]", target, config.Descriptor.Family, config.Descriptor.Name) inspectedModelArtifact := &InspectedModelArtifact{ ID: manifest.Config.Digest.String(), @@ -116,5 +116,6 @@ func (b *backend) Inspect(ctx context.Context, target string, cfg *config.Inspec }) } + logrus.Infof("inspect: successfully inspected target %s", target) return inspectedModelArtifact, nil } diff --git a/pkg/backend/list.go b/pkg/backend/list.go index dd8333c9..b6028ee4 100644 --- a/pkg/backend/list.go +++ b/pkg/backend/list.go @@ -44,7 +44,7 @@ type ModelArtifact struct { // List lists all the model artifacts. func (b *backend) List(ctx context.Context) ([]*ModelArtifact, error) { - logrus.Info("listing model artifacts") + logrus.Info("list: starting list operation for model artifacts") modelArtifacts := []*ModelArtifact{} // list all the repositories. @@ -53,7 +53,7 @@ func (b *backend) List(ctx context.Context) ([]*ModelArtifact, error) { return nil, fmt.Errorf("failed to list repositories: %w", err) } - logrus.Infof("listed %d repositories: %+v", len(repos), repos) + logrus.Debugf("list: loaded repositories [count: %d]", len(repos)) // list all the tags in the repository. for _, repo := range repos { @@ -62,7 +62,7 @@ func (b *backend) List(ctx context.Context) ([]*ModelArtifact, error) { return nil, fmt.Errorf("failed to list tags in repository %s: %w", repo, err) } - logrus.Infof("listed %d tags in repository %s: %+v", len(tags), repo, tags) + logrus.Debugf("list: loaded tags for repository %s [count: %d]", repo, len(tags)) // assemble the model artifact. for _, tag := range tags { @@ -79,6 +79,7 @@ func (b *backend) List(ctx context.Context) ([]*ModelArtifact, error) { return modelArtifacts[i].CreatedAt.After(modelArtifacts[j].CreatedAt) }) + logrus.Infof("list: successfully listed model artifacts [count: %d]", len(modelArtifacts)) return modelArtifacts, nil } diff --git a/pkg/backend/login.go b/pkg/backend/login.go index 48a47ec2..cff5f53f 100644 --- a/pkg/backend/login.go +++ b/pkg/backend/login.go @@ -21,6 +21,7 @@ import ( "crypto/tls" "net/http" + "github.com/sirupsen/logrus" "oras.land/oras-go/v2/registry/remote" "oras.land/oras-go/v2/registry/remote/auth" "oras.land/oras-go/v2/registry/remote/credentials" @@ -31,6 +32,7 @@ import ( // Login logs into a registry. func (b *backend) Login(ctx context.Context, registry, username, password string, cfg *config.Login) error { + logrus.Infof("login: starting login operation for registry %s [user: %s]", registry, username) // read credentials from docker store. store, err := credentials.NewStoreFromDocker(credentials.StoreOptions{AllowPlaintextPut: true}) if err != nil { @@ -64,5 +66,10 @@ func (b *backend) Login(ctx context.Context, registry, username, password string Password: password, } - return credentials.Login(ctx, store, reg, cred) + if err := credentials.Login(ctx, store, reg, cred); err != nil { + return err + } + + logrus.Infof("login: successfully logged into registry %s [user: %s]", registry, username) + return nil } diff --git a/pkg/backend/logout.go b/pkg/backend/logout.go index 9837ae8f..3793a4e0 100644 --- a/pkg/backend/logout.go +++ b/pkg/backend/logout.go @@ -19,11 +19,13 @@ package backend import ( "context" + "github.com/sirupsen/logrus" "oras.land/oras-go/v2/registry/remote/credentials" ) // Logout logs out of a registry. func (b *backend) Logout(ctx context.Context, registry string) error { + logrus.Infof("logout: starting logout operation for registry %s", registry) // read credentials from docker store. store, err := credentials.NewStoreFromDocker(credentials.StoreOptions{AllowPlaintextPut: true}) if err != nil { @@ -35,5 +37,6 @@ func (b *backend) Logout(ctx context.Context, registry string) error { return err } + logrus.Infof("logout: successfully logged out of registry %s", registry) return nil } diff --git a/pkg/backend/nydusify.go b/pkg/backend/nydusify.go index 5d5a7d93..1a1001ef 100644 --- a/pkg/backend/nydusify.go +++ b/pkg/backend/nydusify.go @@ -20,6 +20,8 @@ import ( "context" "os" "os/exec" + + "github.com/sirupsen/logrus" ) const ( @@ -29,6 +31,7 @@ const ( // Nydusify is a function that converts a given model artifact to a nydus image. func (b *backend) Nydusify(ctx context.Context, source string) (string, error) { + logrus.Infof("nydusify: starting nydusify operation for source %s", source) target := source + nydusImageTagSuffix cmd := exec.CommandContext( ctx, @@ -51,5 +54,6 @@ func (b *backend) Nydusify(ctx context.Context, source string) (string, error) { return "", err } + logrus.Infof("nydusify: successfully nydusified source %s to target %s", source, target) return target, nil } diff --git a/pkg/backend/processor/base.go b/pkg/backend/processor/base.go index 6e57028c..4cd0d76e 100644 --- a/pkg/backend/processor/base.go +++ b/pkg/backend/processor/base.go @@ -51,7 +51,7 @@ type base struct { // Process implements the Processor interface, which can be reused by other processors. func (b *base) Process(ctx context.Context, builder build.Builder, workDir string, opts ...ProcessOption) ([]ocispec.Descriptor, error) { - logrus.Infof("processing %s, mediaType: %s, pattern: %s", b.name, b.mediaType, b.patterns) + logrus.Infof("processor: starting %s processing [mediaType: %s, patterns: %v]", b.name, b.mediaType, b.patterns) processOpts := &processOptions{} for _, opt := range opts { @@ -96,7 +96,7 @@ func (b *base) Process(ctx context.Context, builder build.Builder, workDir strin sort.Strings(matchedPaths) - logrus.Infof("processing %d %s files in total...", len(matchedPaths), b.name) + logrus.Infof("processor: processing %s files [count: %d]", b.name, len(matchedPaths)) var ( mu sync.Mutex @@ -131,7 +131,7 @@ func (b *base) Process(ctx context.Context, builder build.Builder, workDir strin eg.Go(func() error { return retry.Do(func() error { - logrus.Infof("processing %s file, path: %s", b.name, path) + logrus.Debugf("processor: processing %s file %s", b.name, path) desc, err := builder.BuildLayer(ctx, b.mediaType, workDir, path, hooks.NewHooks( hooks.WithOnStart(func(name string, size int64, reader io.Reader) io.Reader { @@ -145,13 +145,13 @@ func (b *base) Process(ctx context.Context, builder build.Builder, workDir strin }), )) if err != nil { - logrus.Errorf("failed to build layer: %v, cancel other build process", err) + logrus.Errorf("processor: failed to build layer for %s file %s: %v", b.name, path, err) cancel() return err } mu.Lock() - logrus.Infof("%s layer %s built successfully, digest: %s, size: %d", b.name, path, desc.Digest, desc.Size) + logrus.Debugf("processor: successfully built %s layer for file %s [digest: %s, size: %d]", b.name, path, desc.Digest, desc.Size) descriptors = append(descriptors, desc) mu.Unlock() @@ -164,7 +164,7 @@ func (b *base) Process(ctx context.Context, builder build.Builder, workDir strin return nil, err } - logrus.Infof("processed %d %s files in total successfully", len(matchedPaths), b.name) + logrus.Infof("processor: successfully processed %s files [count: %d]", b.name, len(matchedPaths)) sort.Slice(descriptors, func(i int, j int) bool { // Sort by filepath by default. @@ -180,7 +180,7 @@ func (b *base) Process(ctx context.Context, builder build.Builder, workDir strin return pathI < pathJ }) - logrus.Debugf("sorted layers: %+v", descriptors) + logrus.Debugf("processor: sorted %s layers [layers: %+v]", b.name, descriptors) return descriptors, nil } diff --git a/pkg/backend/prune.go b/pkg/backend/prune.go index 35590c6d..0b5ac364 100644 --- a/pkg/backend/prune.go +++ b/pkg/backend/prune.go @@ -25,7 +25,7 @@ import ( // Prune prunes the unused blobs and clean up the storage. func (b *backend) Prune(ctx context.Context, dryRun, removeUntagged bool) error { - logrus.Infof("pruning unused blobs and cleaning up storage...") + logrus.Infof("prune: starting prune operation for unused blobs and storage cleanup") if err := b.store.PerformGC(ctx, dryRun, removeUntagged); err != nil { return fmt.Errorf("faile to perform gc: %w", err) @@ -35,7 +35,6 @@ func (b *backend) Prune(ctx context.Context, dryRun, removeUntagged bool) error return fmt.Errorf("failed to perform purge uploads: %w", err) } - logrus.Infof("pruned unused blobs and cleaning up storage successfully") - + logrus.Infof("prune: successfully pruned unused blobs and cleaned up storage") return nil } diff --git a/pkg/backend/pull.go b/pkg/backend/pull.go index b219b716..7ec4ceb0 100644 --- a/pkg/backend/pull.go +++ b/pkg/backend/pull.go @@ -36,10 +36,11 @@ import ( // Pull pulls an artifact from a registry. func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) error { - logrus.Infof("pulling artifact %s, cfg: %+v", target, cfg) + logrus.Infof("pull: starting pull operation for target %s [config: %+v]", target, cfg) // pullByDragonfly is called if a Dragonfly endpoint is specified in the configuration. if cfg.DragonflyEndpoint != "" { + logrus.Infof("pull: using dragonfly for target %s", target) return b.pullByDragonfly(ctx, target, cfg) } @@ -67,7 +68,7 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err return fmt.Errorf("failed to decode the manifest: %w", err) } - logrus.Infof("manifest: %+v", manifest) + logrus.Debugf("pull: loaded manifest for target %s [manifest: %+v]", target, manifest) // TODO: need refactor as currently use a global flag to control the progress bar render. if cfg.DisableProgress { @@ -101,7 +102,7 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err } } - logrus.Infof("pulling %d layers in total...", len(manifest.Layers)) + logrus.Infof("pull: processing layers for target %s [count: %d]", target, len(manifest.Layers)) for _, layer := range manifest.Layers { g.Go(func() error { select { @@ -111,13 +112,17 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err } return retry.Do(func() error { - logrus.Infof("pulling layer %s...", layer.Digest) + logrus.Debugf("pull: processing layer %s", layer.Digest) // call the before hook. cfg.Hooks.BeforePullLayer(layer, manifest) err := fn(layer) // call the after hook. cfg.Hooks.AfterPullLayer(layer, err) - logrus.Infof("pulling layer %s done, err: %v", layer.Digest, err) + if err != nil { + logrus.Debugf("pull: failed to process layer %s: %v", layer.Digest, err) + } else { + logrus.Debugf("pull: successfully processed layer %s", layer.Digest) + } return err }, append(defaultRetryOpts, retry.Context(ctx))...) }) @@ -127,7 +132,7 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err return fmt.Errorf("failed to pull blob to local: %w", err) } - logrus.Infof("pulled %d layers in total successfully", len(manifest.Layers)) + logrus.Infof("pull: successfully processed layers [count: %d]", len(manifest.Layers)) // return earlier if extract from remote is enabled as config and manifest // are not needed for this operation. @@ -156,8 +161,10 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err if err := exportModelArtifact(ctx, dst, manifest, repo, extractCfg); err != nil { return fmt.Errorf("failed to export the artifact to the output directory: %w", err) } + logrus.Infof("pull: successfully pulled and extracted artifact %s", target) } + logrus.Infof("pull: successfully pulled artifact %s", target) return nil } diff --git a/pkg/backend/pull_by_d7y.go b/pkg/backend/pull_by_d7y.go index 235d60dc..c956b4b0 100644 --- a/pkg/backend/pull_by_d7y.go +++ b/pkg/backend/pull_by_d7y.go @@ -48,7 +48,7 @@ const ( // pullByDragonfly pulls and hardlinks blobs from Dragonfly gRPC service for remote extraction. func (b *backend) pullByDragonfly(ctx context.Context, target string, cfg *config.Pull) error { - logrus.Infof("pulling %s by Dragonfly", target) + logrus.Infof("pull: starting dragonfly pull operation for target %s", target) // Parse reference and initialize remote client. ref, err := ParseReference(target) if err != nil { @@ -73,7 +73,7 @@ func (b *backend) pullByDragonfly(ctx context.Context, target string, cfg *confi return fmt.Errorf("failed to decode manifest: %w", err) } - logrus.Infof("manifest: %+v", manifest) + logrus.Debugf("pull: loaded manifest for target %s [manifest: %+v]", target, manifest) // Get authentication token. authToken, err := getAuthToken(ctx, src, registry, repo) @@ -102,7 +102,7 @@ func (b *backend) pullByDragonfly(ctx context.Context, target string, cfg *confi g, ctx := errgroup.WithContext(ctx) g.SetLimit(cfg.Concurrency) - logrus.Infof("pulling %d layers in total...", len(manifest.Layers)) + logrus.Infof("pull: processing layers via dragonfly [count: %d]", len(manifest.Layers)) for _, layer := range manifest.Layers { g.Go(func() error { select { @@ -111,16 +111,21 @@ func (b *backend) pullByDragonfly(ctx context.Context, target string, cfg *confi default: } - logrus.Infof("pulling layer %s...", layer.Digest) + logrus.Debugf("pull: processing layer %s via dragonfly", layer.Digest) if err := processLayer(ctx, pb, dfdaemon.NewDfdaemonDownloadClient(conn), ref, manifest, layer, authToken, cfg); err != nil { return err } - logrus.Infof("layer %s pulled successfully", layer.Digest) + logrus.Debugf("pull: successfully processed layer %s via dragonfly", layer.Digest) return nil }) } - return g.Wait() + if err := g.Wait(); err != nil { + return err + } + + logrus.Infof("pull: successfully pulled artifact %s via dragonfly", target) + return nil } // getAuthToken retrieves the authentication token for the registry. @@ -230,10 +235,10 @@ func downloadAndExtractLayer(ctx context.Context, pb *internalpb.ProgressBar, cl switch taskResp := resp.Response.(type) { case *dfdaemon.DownloadTaskResponse_DownloadTaskStartedResponse: - logrus.Debugf("received DownloadTaskStartedResponse: %+v", taskResp) + logrus.Debugf("pull: dragonfly download started for layer %s", desc.Digest.String()) pb.Add(internalpb.NormalizePrompt("Pulling blob"), desc.Digest.String(), desc.Size, nil) case *dfdaemon.DownloadTaskResponse_DownloadPieceFinishedResponse: - logrus.Debugf("received DownloadPieceFinishedResponse: %+v", taskResp) + logrus.Debugf("pull: dragonfly download progress for layer %s [piece length: %d]", desc.Digest.String(), taskResp.DownloadPieceFinishedResponse.Piece.Length) if bar := pb.Get(desc.Digest.String()); bar != nil { bar.SetCurrent(bar.Current() + int64(taskResp.DownloadPieceFinishedResponse.Piece.Length)) } diff --git a/pkg/backend/push.go b/pkg/backend/push.go index 690d77f3..18171854 100644 --- a/pkg/backend/push.go +++ b/pkg/backend/push.go @@ -37,7 +37,7 @@ import ( // Push pushes the image to the registry. func (b *backend) Push(ctx context.Context, target string, cfg *config.Push) error { - logrus.Infof("pushing model artifact %s, cfg: %+v", target, cfg) + logrus.Infof("push: starting push operation for target %s [config: %+v]", target, cfg) // parse the repository and tag from the target. ref, err := ParseReference(target) if err != nil { @@ -58,7 +58,7 @@ func (b *backend) Push(ctx context.Context, target string, cfg *config.Push) err return fmt.Errorf("failed to pull the manifest: %w", err) } - logrus.Infof("manifest: %s", string(manifestRaw)) + logrus.Debugf("push: loaded manifest for target %s [manifest: %s]", target, string(manifestRaw)) var manifest ocispec.Manifest if err := json.Unmarshal(manifestRaw, &manifest); err != nil { @@ -80,7 +80,7 @@ func (b *backend) Push(ctx context.Context, target string, cfg *config.Push) err g, ctx := errgroup.WithContext(ctx) g.SetLimit(cfg.Concurrency) - logrus.Infof("pushing %d layers in total...", len(manifest.Layers)) + logrus.Infof("push: processing layers for target %s [count: %d]", target, len(manifest.Layers)) for _, layer := range manifest.Layers { g.Go(func() error { select { @@ -90,11 +90,11 @@ func (b *backend) Push(ctx context.Context, target string, cfg *config.Push) err } return retry.Do(func() error { - logrus.Infof("pushing layer %s...", layer.Digest) + logrus.Debugf("push: processing layer %s", layer.Digest) if err := pushIfNotExist(ctx, pb, internalpb.NormalizePrompt("Copying blob"), src, dst, layer, repo, tag); err != nil { return err } - logrus.Infof("pushed layer %s successfully", layer.Digest) + logrus.Debugf("push: successfully processed layer %s", layer.Digest) return nil }, append(defaultRetryOpts, retry.Context(ctx))...) }) @@ -123,6 +123,7 @@ func (b *backend) Push(ctx context.Context, target string, cfg *config.Push) err return fmt.Errorf("failed to push manifest to remote: %w", err) } + logrus.Infof("push: successfully pushed artifact %s", target) return nil } diff --git a/pkg/backend/rm.go b/pkg/backend/rm.go index 1e90f2dd..a41c1aa9 100644 --- a/pkg/backend/rm.go +++ b/pkg/backend/rm.go @@ -26,7 +26,7 @@ import ( // Remove removes the target from the storage, notice that remove only removes the manifest, // the blobs may still be used by other manifests, so should use prune to remove the unused blobs. func (b *backend) Remove(ctx context.Context, target string) (string, error) { - logrus.Infof("removing target %s", target) + logrus.Infof("remove: starting remove operation for target %s", target) ref, err := ParseReference(target) if err != nil { return "", fmt.Errorf("failed to parse target: %w", err) @@ -47,7 +47,6 @@ func (b *backend) Remove(ctx context.Context, target string) (string, error) { return "", fmt.Errorf("failed to delete manifest %s: %w", reference, err) } - logrus.Infof("manifest %s removed", reference) - + logrus.Infof("remove: successfully removed manifest %s", reference) return reference, nil } diff --git a/pkg/backend/tag.go b/pkg/backend/tag.go index 981a0bbd..9b583c4a 100644 --- a/pkg/backend/tag.go +++ b/pkg/backend/tag.go @@ -27,7 +27,7 @@ import ( // Tag creates a new tag that refers to the source model artifact. func (b *backend) Tag(ctx context.Context, source, target string) error { - logrus.Infof("tagging source artifact %s to target %s", source, target) + logrus.Infof("tag: starting tag operation from source %s to target %s", source, target) srcRef, err := ParseReference(source) if err != nil { return fmt.Errorf("failed to parse source: %w", err) @@ -43,7 +43,7 @@ func (b *backend) Tag(ctx context.Context, source, target string) error { return fmt.Errorf("failed to pull manifest: %w", err) } - logrus.Infof("manifest pulled from source artifact %s", string(manifestRaw)) + logrus.Debugf("tag: loaded manifest from source %s [manifest: %s]", source, string(manifestRaw)) var manifest ocispec.Manifest if err := json.Unmarshal(manifestRaw, &manifest); err != nil { @@ -57,18 +57,17 @@ func (b *backend) Tag(ctx context.Context, source, target string) error { } for _, layer := range layers { - logrus.Infof("mounting blob %s...", layer.Digest.String()) + logrus.Debugf("tag: mounting blob %s", layer.Digest.String()) if err := b.store.MountBlob(ctx, srcRef.Repository(), targetRef.Repository(), layer); err != nil { return fmt.Errorf("failed to mount blob %s: %w", layer.Digest.String(), err) } - logrus.Infof("blob %s mounted successfully", layer.Digest.String()) + logrus.Debugf("tag: successfully mounted blob %s", layer.Digest.String()) } if _, err := b.store.PushManifest(ctx, targetRef.Repository(), targetRef.Tag(), manifestRaw); err != nil { return fmt.Errorf("failed to push manifest: %w", err) } - logrus.Infof("manifest pushed to target artifact %s successfully", target) - + logrus.Infof("tag: successfully tagged source %s to target %s", source, target) return nil } diff --git a/pkg/backend/upload.go b/pkg/backend/upload.go new file mode 100644 index 00000000..c5d321e3 --- /dev/null +++ b/pkg/backend/upload.go @@ -0,0 +1,58 @@ +/* + * Copyright 2025 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backend + +import ( + "context" + "fmt" + + "github.com/sirupsen/logrus" + + internalpb "github.com/CloudNativeAI/modctl/internal/pb" + "github.com/CloudNativeAI/modctl/pkg/backend/build" + "github.com/CloudNativeAI/modctl/pkg/backend/processor" + "github.com/CloudNativeAI/modctl/pkg/config" +) + +// Upload uploads the file to a model artifact repository in advance, but will not push config and manifest. +func (b *backend) Upload(ctx context.Context, filepath string, cfg *config.Upload) error { + logrus.Infof("upload: starting upload operation for file %s [repository: %s]", filepath, cfg.Repo) + proc := b.getProcessor(filepath, cfg.Raw) + if proc == nil { + return fmt.Errorf("failed to get processor for file %s", filepath) + } + + opts := []build.Option{ + build.WithPlainHTTP(cfg.PlainHTTP), + build.WithInsecure(cfg.Insecure), + } + builder, err := build.NewBuilder(build.OutputTypeRemote, b.store, cfg.Repo, "", opts...) + if err != nil { + return fmt.Errorf("failed to create builder: %w", err) + } + + pb := internalpb.NewProgressBar() + pb.Start() + defer pb.Stop() + + if _, err = proc.Process(ctx, builder, ".", processor.WithProgressTracker(pb)); err != nil { + return fmt.Errorf("failed to process layers: %w", err) + } + + logrus.Infof("upload: successfully uploaded file %s [repository: %s]", filepath, cfg.Repo) + return nil +} diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go index 537507db..4b4105f5 100644 --- a/pkg/codec/codec.go +++ b/pkg/codec/codec.go @@ -72,3 +72,8 @@ func TypeFromMediaType(mediaType string) Type { return "" } + +// IsRawMediaType returns true if the media type is raw. +func IsRawMediaType(mediaType string) bool { + return strings.HasSuffix(mediaType, ".raw") +} diff --git a/pkg/config/upload.go b/pkg/config/upload.go new file mode 100644 index 00000000..df3187b9 --- /dev/null +++ b/pkg/config/upload.go @@ -0,0 +1,43 @@ +/* + * Copyright 2025 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import "errors" + +type Upload struct { + Repo string + PlainHTTP bool + Insecure bool + Raw bool +} + +func NewUpload() *Upload { + return &Upload{ + Repo: "", + PlainHTTP: false, + Insecure: false, + Raw: false, + } +} + +func (u *Upload) Validate() error { + if u.Repo == "" { + return errors.New("repo is required") + } + + return nil +}