diff --git a/README.md b/README.md index e7b4125a..2a04af65 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,12 @@ services: ports: - "9898:9898" restart: unless-stopped + # Optional, these are required for fast repo browsing mode using FUSE + # but are not required for the default repo browsing mode using `restic ls` + cap_add: + - SYS_ADMIN + devices: + - /dev/fuse:/dev/fuse ``` ## Running on Linux diff --git a/internal/orchestrator/repo/repo.go b/internal/orchestrator/repo/repo.go index a509a3bc..345775df 100644 --- a/internal/orchestrator/repo/repo.go +++ b/internal/orchestrator/repo/repo.go @@ -5,7 +5,9 @@ import ( "errors" "fmt" "io" + "os" "path" + "path/filepath" "runtime" "slices" "sort" @@ -30,6 +32,12 @@ type RepoOrchestrator struct { config *v1.Config repoConfig *v1.Repo repo *restic.Repo + + mountMu sync.Mutex + mountErr error + mountDir string + mountTimer *time.Timer + mountUsers int } // NewRepoOrchestrator accepts a config and a repo that is configured with the properties of that config object. @@ -86,6 +94,69 @@ func (r *RepoOrchestrator) logger(ctx context.Context) *zap.Logger { return logging.Logger(ctx, "[repo-manager] ").With(zap.String("repo", r.repoConfig.Id)) } +func (r *RepoOrchestrator) Mount(ctx context.Context) (string, func(), error) { + // Ensure that the mount stays alive at least as long as the context of this function call. + r.mountMu.Lock() + defer r.mountMu.Unlock() + + // If mount has failed in the past, don't retry just return the error. + if r.mountErr != nil { + return "", func() {}, r.mountErr + } + + if r.mountTimer == nil { + // Try to mount the repo + var err error + r.mountDir, err = os.MkdirTemp("", "restic-"+r.repoConfig.Id+"-mount") + if err != nil { + r.logger(ctx).Warn("failed to create mount directory", zap.Error(err)) + r.mountErr = fmt.Errorf("create mount directory: %w", err) + return "", func() {}, r.mountErr + } + + mountCtx, cancel := context.WithCancel(context.Background()) + if err := r.repo.Mount(mountCtx, r.mountDir, 60*time.Second); err != nil { + r.logger(ctx).Warn("failed to mount repo", zap.Error(err)) + r.mountErr = fmt.Errorf("mount repo %v: %w", r.repoConfig.Id, err) + cancel() + os.Remove(r.mountDir) + return "", func() {}, r.mountErr + } + r.mountTimer = time.AfterFunc(1000*time.Hour, func() { + r.mountMu.Lock() + defer r.mountMu.Unlock() + r.mountTimer = nil + cancel() + + // Try to remove the mount directory up to 10 times with a 1 second delay between each attempt. + for i := 0; i < 10; i++ { + if err := os.Remove(r.mountDir); err != nil { + r.logger(ctx).Warn("failed to remove mount directory", zap.String("dir", r.mountDir), zap.Error(err)) + time.Sleep(1 * time.Second) + continue + } else { + r.logger(ctx).Debug("removed mount directory", zap.String("dir", r.mountDir)) + } + break + } + }) + r.mountTimer.Stop() + } + r.mountUsers++ + + var once sync.Once + return r.mountDir, func() { + once.Do(func() { + r.mountMu.Lock() + defer r.mountMu.Unlock() + r.mountUsers-- + if r.mountUsers == 0 { + r.mountTimer.Reset(60 * time.Second) + } + }) + }, nil +} + func (r *RepoOrchestrator) Exists(ctx context.Context) error { return r.repo.Exists(ctx) } @@ -188,6 +259,52 @@ func (r *RepoOrchestrator) ListSnapshotFiles(ctx context.Context, snapshotId str ctx, flush := forwardResticLogs(ctx) defer flush() + if runtime.GOOS != "windows" { + mountCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + mountDir, releaseMount, mountErr := r.Mount(mountCtx) + if mountErr != nil { + r.logger(ctx).Warn("failed to mount repo, falling back to restic ls", zap.Error(mountErr)) + } else { + defer releaseMount() + + // List the directory using os.ReadDir + readPath := filepath.Join(mountDir, "ids", snapshotId[:8], path) + entries, err := os.ReadDir(readPath) + if err != nil { + r.logger(ctx).Warn("failed to list directory", zap.Error(err), zap.String("path", readPath)) + return nil, fmt.Errorf("failed to list directory: %w", err) + } + + lsEnts := make([]*v1.LsEntry, 0, len(entries)) + for _, entry := range entries { + info, err := entry.Info() + if err != nil { + continue + } + + typeName := "file" + if entry.IsDir() { + typeName = "directory" + } + + lsEnts = append(lsEnts, &v1.LsEntry{ + Name: entry.Name(), + Type: typeName, + Path: filepath.Join(path, entry.Name()), + Size: info.Size(), + Mode: int64(info.Mode()), + Mtime: info.ModTime().Format(time.RFC3339), + Atime: info.ModTime().Format(time.RFC3339), + Ctime: info.ModTime().Format(time.RFC3339), + }) + } + + return lsEnts, nil + } + } + _, entries, err := r.repo.ListDirectory(ctx, snapshotId, path) if err != nil { return nil, fmt.Errorf("failed to list snapshot files: %w", err) diff --git a/pkg/restic/restic.go b/pkg/restic/restic.go index d039634b..1de61f42 100644 --- a/pkg/restic/restic.go +++ b/pkg/restic/restic.go @@ -9,10 +9,12 @@ import ( "io" "os" "os/exec" + "path/filepath" "runtime" "slices" "strings" "sync" + "time" "unicode" "github.com/djherbis/buffer" @@ -466,6 +468,54 @@ func (r *Repo) Stats(ctx context.Context, opts ...GenericOption) (*RepoStats, er return &stats, nil } +func (r *Repo) Mount(ctx context.Context, dir string, mountTimeout time.Duration, opts ...GenericOption) error { + // Check if already mounted + if info, err := os.Stat(dir); err == nil { + if info.IsDir() { + // Try to detect if it's already a FUSE mount by checking if we can list it + if entries, err := os.ReadDir(dir); err == nil && len(entries) > 0 { + return fmt.Errorf("directory %s appears to already be mounted", dir) + } + } + } + + // Start the mount command in background + errorCollector := errorMessageCollector{} + cmd := r.commandWithContext(ctx, []string{"mount", dir}, opts...) + r.handleOutput(cmd, withAllTo(&errorCollector), withLogWriterFromContext(ctx)) + if err := cmd.Start(); err != nil { + return errorCollector.AddCmdOutputToError(cmd, err) + } + + // Wait for mount to become available + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + timeout := time.NewTimer(mountTimeout) + defer timeout.Stop() + + for { + select { + case <-ctx.Done(): + cmd.Process.Kill() + return ctx.Err() + case <-timeout.C: + cmd.Process.Kill() + return fmt.Errorf("mount timeout") + case <-ticker.C: + if _, err := os.Stat(dir); err == nil { + if ents, err := os.ReadDir(filepath.Join(dir, "ids")); err == nil && len(ents) > 0 { + return nil + } + } + + if cmd.ProcessState != nil && cmd.ProcessState.Exited() { + return errorCollector.AddCmdOutputToError(cmd, fmt.Errorf("restic mount process exited with code %d", cmd.ProcessState.ExitCode())) + } + } + } +} + // AddTags adds tags to the specified snapshots. func (r *Repo) AddTags(ctx context.Context, snapshotIDs []string, tags []string, opts ...GenericOption) error { args := []string{"tag"}