Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ services:
ports:
- "9898:9898"
restart: unless-stopped
# Optional, these are required for fast repo browsing mode using FUSE
# but are not required for the default repo browsing mode using `restic ls`
cap_add:
- SYS_ADMIN
devices:
- /dev/fuse:/dev/fuse
```

## Running on Linux
Expand Down
117 changes: 117 additions & 0 deletions internal/orchestrator/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"runtime"
"slices"
"sort"
Expand All @@ -30,6 +32,12 @@ type RepoOrchestrator struct {
config *v1.Config
repoConfig *v1.Repo
repo *restic.Repo

mountMu sync.Mutex
mountErr error
mountDir string
mountTimer *time.Timer
mountUsers int
}

// NewRepoOrchestrator accepts a config and a repo that is configured with the properties of that config object.
Expand Down Expand Up @@ -86,6 +94,69 @@ func (r *RepoOrchestrator) logger(ctx context.Context) *zap.Logger {
return logging.Logger(ctx, "[repo-manager] ").With(zap.String("repo", r.repoConfig.Id))
}

func (r *RepoOrchestrator) Mount(ctx context.Context) (string, func(), error) {
// Ensure that the mount stays alive at least as long as the context of this function call.
r.mountMu.Lock()
defer r.mountMu.Unlock()

// If mount has failed in the past, don't retry just return the error.
if r.mountErr != nil {
return "", func() {}, r.mountErr
}

if r.mountTimer == nil {
// Try to mount the repo
var err error
r.mountDir, err = os.MkdirTemp("", "restic-"+r.repoConfig.Id+"-mount")
if err != nil {
r.logger(ctx).Warn("failed to create mount directory", zap.Error(err))
r.mountErr = fmt.Errorf("create mount directory: %w", err)
return "", func() {}, r.mountErr
}

mountCtx, cancel := context.WithCancel(context.Background())
if err := r.repo.Mount(mountCtx, r.mountDir, 60*time.Second); err != nil {
r.logger(ctx).Warn("failed to mount repo", zap.Error(err))
r.mountErr = fmt.Errorf("mount repo %v: %w", r.repoConfig.Id, err)
cancel()
os.Remove(r.mountDir)
return "", func() {}, r.mountErr
}
r.mountTimer = time.AfterFunc(1000*time.Hour, func() {
r.mountMu.Lock()
defer r.mountMu.Unlock()
r.mountTimer = nil
cancel()

// Try to remove the mount directory up to 10 times with a 1 second delay between each attempt.
for i := 0; i < 10; i++ {
if err := os.Remove(r.mountDir); err != nil {
r.logger(ctx).Warn("failed to remove mount directory", zap.String("dir", r.mountDir), zap.Error(err))
time.Sleep(1 * time.Second)
continue
} else {
r.logger(ctx).Debug("removed mount directory", zap.String("dir", r.mountDir))
}
break
}
})
r.mountTimer.Stop()
}
r.mountUsers++

var once sync.Once
return r.mountDir, func() {
once.Do(func() {
r.mountMu.Lock()
defer r.mountMu.Unlock()
r.mountUsers--
if r.mountUsers == 0 {
r.mountTimer.Reset(60 * time.Second)
}
})
}, nil
}

func (r *RepoOrchestrator) Exists(ctx context.Context) error {
return r.repo.Exists(ctx)
}
Expand Down Expand Up @@ -188,6 +259,52 @@ func (r *RepoOrchestrator) ListSnapshotFiles(ctx context.Context, snapshotId str
ctx, flush := forwardResticLogs(ctx)
defer flush()

if runtime.GOOS != "windows" {
mountCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

mountDir, releaseMount, mountErr := r.Mount(mountCtx)
if mountErr != nil {
r.logger(ctx).Warn("failed to mount repo, falling back to restic ls", zap.Error(mountErr))
} else {
defer releaseMount()

// List the directory using os.ReadDir
readPath := filepath.Join(mountDir, "ids", snapshotId[:8], path)
entries, err := os.ReadDir(readPath)
if err != nil {
r.logger(ctx).Warn("failed to list directory", zap.Error(err), zap.String("path", readPath))
return nil, fmt.Errorf("failed to list directory: %w", err)
}

lsEnts := make([]*v1.LsEntry, 0, len(entries))
for _, entry := range entries {
info, err := entry.Info()
if err != nil {
continue
}

typeName := "file"
if entry.IsDir() {
typeName = "directory"
}

lsEnts = append(lsEnts, &v1.LsEntry{
Name: entry.Name(),
Type: typeName,
Path: filepath.Join(path, entry.Name()),
Size: info.Size(),
Mode: int64(info.Mode()),
Mtime: info.ModTime().Format(time.RFC3339),
Atime: info.ModTime().Format(time.RFC3339),
Ctime: info.ModTime().Format(time.RFC3339),
})
}

return lsEnts, nil
}
}

_, entries, err := r.repo.ListDirectory(ctx, snapshotId, path)
if err != nil {
return nil, fmt.Errorf("failed to list snapshot files: %w", err)
Expand Down
50 changes: 50 additions & 0 deletions pkg/restic/restic.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
"io"
"os"
"os/exec"
"path/filepath"
"runtime"
"slices"
"strings"
"sync"
"time"
"unicode"

"github.com/djherbis/buffer"
Expand Down Expand Up @@ -466,6 +468,54 @@ func (r *Repo) Stats(ctx context.Context, opts ...GenericOption) (*RepoStats, er
return &stats, nil
}

func (r *Repo) Mount(ctx context.Context, dir string, mountTimeout time.Duration, opts ...GenericOption) error {
// Check if already mounted
if info, err := os.Stat(dir); err == nil {
if info.IsDir() {
// Try to detect if it's already a FUSE mount by checking if we can list it
if entries, err := os.ReadDir(dir); err == nil && len(entries) > 0 {
return fmt.Errorf("directory %s appears to already be mounted", dir)
}
}
}

// Start the mount command in background
errorCollector := errorMessageCollector{}
cmd := r.commandWithContext(ctx, []string{"mount", dir}, opts...)
r.handleOutput(cmd, withAllTo(&errorCollector), withLogWriterFromContext(ctx))
if err := cmd.Start(); err != nil {
return errorCollector.AddCmdOutputToError(cmd, err)
}

// Wait for mount to become available
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

timeout := time.NewTimer(mountTimeout)
defer timeout.Stop()

for {
select {
case <-ctx.Done():
cmd.Process.Kill()
return ctx.Err()
case <-timeout.C:
cmd.Process.Kill()
return fmt.Errorf("mount timeout")
case <-ticker.C:
if _, err := os.Stat(dir); err == nil {
if ents, err := os.ReadDir(filepath.Join(dir, "ids")); err == nil && len(ents) > 0 {
return nil
}
}

if cmd.ProcessState != nil && cmd.ProcessState.Exited() {
return errorCollector.AddCmdOutputToError(cmd, fmt.Errorf("restic mount process exited with code %d", cmd.ProcessState.ExitCode()))
}
}
}
}

// AddTags adds tags to the specified snapshots.
func (r *Repo) AddTags(ctx context.Context, snapshotIDs []string, tags []string, opts ...GenericOption) error {
args := []string{"tag"}
Expand Down
Loading