diff --git a/packages/orchestrator/pkg/sandbox/block/cache.go b/packages/orchestrator/pkg/sandbox/block/cache.go index d9690b1de0..7bf39d7a20 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache.go +++ b/packages/orchestrator/pkg/sandbox/block/cache.go @@ -8,7 +8,6 @@ import ( "math" "math/rand" "os" - "slices" "sync" "sync/atomic" "syscall" @@ -17,9 +16,13 @@ import ( "github.com/bits-and-blooms/bitset" "github.com/edsrzf/mmap-go" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap" "golang.org/x/sys/unix" + "github.com/e2b-dev/infra/packages/shared/pkg/logger" "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" + "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" ) const ( @@ -82,7 +85,7 @@ func NewCache(size, blockSize int64, filePath string, dirtyFile bool) (*Cache, e return nil, fmt.Errorf("size too big: %d > %d", size, math.MaxInt) } - mm, err := mmap.MapRegion(f, int(size), unix.PROT_READ|unix.PROT_WRITE, 0, 0) + mm, err := mmap.MapRegion(f, int(size), mmap.RDWR, 0, 0) if err != nil { return nil, fmt.Errorf("error mapping file: %w", err) } @@ -100,27 +103,7 @@ func (c *Cache) isClosed() bool { return c.closed.Load() } -func (c *Cache) Sync() error { - c.mu.Lock() - defer c.mu.Unlock() - - if c.isClosed() { - return NewErrCacheClosed(c.filePath) - } - - if c.mmap == nil { - return nil - } - - err := c.mmap.Flush() - if err != nil { - return fmt.Errorf("error syncing cache: %w", err) - } - - return nil -} - -func (c *Cache) ExportToDiff(ctx context.Context, out io.Writer) (*header.DiffMetadata, error) { +func (c *Cache) ExportToDiff(ctx context.Context, out *os.File) (*header.DiffMetadata, error) { ctx, childSpan := tracer.Start(ctx, "export-to-diff") defer childSpan.End() @@ -139,23 +122,96 @@ func (c *Cache) ExportToDiff(ctx context.Context, out io.Writer) (*header.DiffMe }, nil } - err := c.mmap.Flush() + f, err := os.Open(c.filePath) + if err != nil { + return nil, fmt.Errorf("error opening file: %w", err) + } + defer f.Close() + + src := int(f.Fd()) + + // Explicit mmap flush is not necessary, because the kernel will handle that as part of the copy_file_range syscall. + // Calling sync_file_range marks the range for writeback and starts it early. + // This is just an optimization, so if it fails just log a warning and let copy_file_range do the actual work. + err = unix.SyncFileRange(src, 0, c.size, unix.SYNC_FILE_RANGE_WRITE) if err != nil { - return nil, fmt.Errorf("error flushing mmap: %w", err) + logger.L().Warn(ctx, "error syncing file", zap.Error(err)) } + buildStart := time.Now() builder := header.NewDiffMetadataBuilder(c.size, c.blockSize) - for _, offset := range c.dirtySortedKeys() { - block := (*c.mmap)[offset : offset+c.blockSize] + // We don't need to sort the keys as the bitset handles the ordering. + c.dirty.Range(func(key, _ any) bool { + builder.AddDirtyOffset(key.(int64)) - err := builder.Process(ctx, block, out, offset) - if err != nil { - return nil, fmt.Errorf("error processing block %d: %w", offset, err) + return true + }) + + diffMetadata := builder.Build() + telemetry.SetAttributes(ctx, attribute.Int64("build_metadata_ms", time.Since(buildStart).Milliseconds())) + + dst := int(out.Fd()) + var writeOffset int64 + var totalRanges int64 + fallback := false + + copyStart := time.Now() + for r := range BitsetRanges(diffMetadata.Dirty, diffMetadata.BlockSize) { + totalRanges++ + remaining := int(r.Size) + readOffset := r.Start + + // The kernel may return short writes (e.g. capped at MAX_RW_COUNT on non-reflink filesystems), + // so we loop until the full range is copied. The offset pointers are advanced by the kernel. + for remaining > 0 { + if !fallback { + // On XFS this uses reflink automatically. + n, err := unix.CopyFileRange( + src, + &readOffset, + dst, + &writeOffset, + remaining, + 0, + ) + switch { + case errors.Is(err, syscall.EXDEV) || errors.Is(err, syscall.EOPNOTSUPP) || errors.Is(err, syscall.ENOSYS): + fallback = true + logger.L().Warn(ctx, "copy_file_range unsupported, falling back to normal copy", zap.Error(err)) + case err != nil: + return nil, fmt.Errorf("error copying file range: %w", err) + case n == 0: + return nil, fmt.Errorf("copy_file_range returned 0 with %d bytes remaining", remaining) + default: + remaining -= n + } + } + + // CopyFileRange failed. Falling back to normal copy + if fallback && remaining > 0 { + if _, err := out.Seek(writeOffset, io.SeekStart); err != nil { + return nil, fmt.Errorf("error seeking: %w", err) + } + sr := io.NewSectionReader(f, readOffset, int64(remaining)) + if _, err := io.Copy(out, sr); err != nil { + return nil, fmt.Errorf("error copying file range. %w", err) + } + + writeOffset += int64(remaining) + remaining = 0 + } } } - return builder.Build(), nil + telemetry.SetAttributes(ctx, + attribute.Int64("copy_ms", time.Since(copyStart).Milliseconds()), + attribute.Int64("total_size_bytes", c.size), + attribute.Int64("dirty_size_bytes", int64(diffMetadata.Dirty.Count())*c.blockSize), + attribute.Int64("total_ranges", totalRanges), + ) + + return diffMetadata, nil } func (c *Cache) ReadAt(b []byte, off int64) (int, error) { @@ -291,20 +347,6 @@ func (c *Cache) WriteAtWithoutLock(b []byte, off int64) (int, error) { return n, nil } -// dirtySortedKeys returns a sorted list of dirty keys. -// Key represents a block offset. -func (c *Cache) dirtySortedKeys() []int64 { - var keys []int64 - c.dirty.Range(func(key, _ any) bool { - keys = append(keys, key.(int64)) - - return true - }) - slices.Sort(keys) - - return keys -} - // FileSize returns the size of the cache on disk. // The size might differ from the dirty size, as it may not be fully on disk. func (c *Cache) FileSize() (int64, error) { diff --git a/packages/orchestrator/pkg/sandbox/diffcreator.go b/packages/orchestrator/pkg/sandbox/diffcreator.go index a55221d984..bd4a27e30a 100644 --- a/packages/orchestrator/pkg/sandbox/diffcreator.go +++ b/packages/orchestrator/pkg/sandbox/diffcreator.go @@ -2,14 +2,14 @@ package sandbox import ( "context" - "io" + "os" "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/rootfs" "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" ) type DiffCreator interface { - process(ctx context.Context, out io.Writer) (*header.DiffMetadata, error) + process(ctx context.Context, out *os.File) (*header.DiffMetadata, error) } type RootfsDiffCreator struct { @@ -17,6 +17,6 @@ type RootfsDiffCreator struct { closeHook func(context.Context) error } -func (r *RootfsDiffCreator) process(ctx context.Context, out io.Writer) (*header.DiffMetadata, error) { +func (r *RootfsDiffCreator) process(ctx context.Context, out *os.File) (*header.DiffMetadata, error) { return r.rootfs.ExportDiff(ctx, out, r.closeHook) } diff --git a/packages/orchestrator/pkg/sandbox/rootfs/direct.go b/packages/orchestrator/pkg/sandbox/rootfs/direct.go index edea9b73ef..13bc98cb4d 100644 --- a/packages/orchestrator/pkg/sandbox/rootfs/direct.go +++ b/packages/orchestrator/pkg/sandbox/rootfs/direct.go @@ -69,7 +69,7 @@ func (o *DirectProvider) Start(_ context.Context) error { func (o *DirectProvider) ExportDiff( ctx context.Context, - out io.Writer, + out *os.File, stopSandbox func(context.Context) error, ) (*header.DiffMetadata, error) { ctx, childSpan := tracer.Start(ctx, "direct-provider-export") diff --git a/packages/orchestrator/pkg/sandbox/rootfs/nbd.go b/packages/orchestrator/pkg/sandbox/rootfs/nbd.go index a29baf9e5a..6fa0f85ac2 100644 --- a/packages/orchestrator/pkg/sandbox/rootfs/nbd.go +++ b/packages/orchestrator/pkg/sandbox/rootfs/nbd.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "os" "go.uber.org/zap" @@ -71,7 +70,7 @@ func (o *NBDProvider) Start(ctx context.Context) error { func (o *NBDProvider) ExportDiff( ctx context.Context, - out io.Writer, + out *os.File, closeSandbox func(ctx context.Context) error, ) (*header.DiffMetadata, error) { ctx, span := tracer.Start(ctx, "cow-export") @@ -93,12 +92,26 @@ func (o *NBDProvider) ExportDiff( select { case <-o.finishedOperations: case <-ctx.Done(): + // Close the cache to avoid leaking the mmaped memory. Log an error + // if that failed + closeErr := cache.Close() + if closeErr != nil { + logger.L().Warn(ctx, "error closing cache", zap.Error(closeErr)) + } + return nil, fmt.Errorf("timeout waiting for overlay device to be released") } telemetry.ReportEvent(ctx, "sandbox stopped") m, err := cache.ExportToDiff(ctx, out) if err != nil { + // Close the cache to avoid leaking the mmaped memory. Log an error + // if that failed + closeErr := cache.Close() + if closeErr != nil { + logger.L().Warn(ctx, "error closing cache", zap.Error(closeErr)) + } + return nil, fmt.Errorf("error exporting cache: %w", err) } diff --git a/packages/orchestrator/pkg/sandbox/rootfs/rootfs.go b/packages/orchestrator/pkg/sandbox/rootfs/rootfs.go index b6802a3a26..b3940536b6 100644 --- a/packages/orchestrator/pkg/sandbox/rootfs/rootfs.go +++ b/packages/orchestrator/pkg/sandbox/rootfs/rootfs.go @@ -3,7 +3,6 @@ package rootfs import ( "context" "fmt" - "io" "os" "syscall" @@ -22,7 +21,7 @@ type Provider interface { Start(ctx context.Context) error Close(ctx context.Context) error Path() (string, error) - ExportDiff(ctx context.Context, out io.Writer, closeSandbox func(context.Context) error) (*header.DiffMetadata, error) + ExportDiff(ctx context.Context, out *os.File, closeSandbox func(context.Context) error) (*header.DiffMetadata, error) } // flush flushes the data to the operating system's buffer. diff --git a/packages/orchestrator/pkg/sandbox/sandbox.go b/packages/orchestrator/pkg/sandbox/sandbox.go index a03d94efb3..3e4048d8c6 100644 --- a/packages/orchestrator/pkg/sandbox/sandbox.go +++ b/packages/orchestrator/pkg/sandbox/sandbox.go @@ -1194,7 +1194,7 @@ func pauseProcessRootfs( return nil, nil, fmt.Errorf("failed to create rootfs diff: %w", err) } - rootfsDiffMetadata, err := diffCreator.process(ctx, rootfsDiffFile) + rootfsDiffMetadata, err := diffCreator.process(ctx, rootfsDiffFile.File) if err != nil { err = errors.Join(err, rootfsDiffFile.Close()) diff --git a/packages/shared/pkg/storage/header/metadata.go b/packages/shared/pkg/storage/header/metadata.go index 32dac10d19..574dea78bf 100644 --- a/packages/shared/pkg/storage/header/metadata.go +++ b/packages/shared/pkg/storage/header/metadata.go @@ -114,6 +114,7 @@ type DiffMetadataBuilder struct { func NewDiffMetadataBuilder(size, blockSize int64) *DiffMetadataBuilder { return &DiffMetadataBuilder{ + // TODO: We might be able to start with 0 as preallocating here actually takes space. dirty: bitset.New(uint(TotalBlocks(size, blockSize))), empty: bitset.New(0), @@ -121,6 +122,10 @@ func NewDiffMetadataBuilder(size, blockSize int64) *DiffMetadataBuilder { } } +func (b *DiffMetadataBuilder) AddDirtyOffset(offset int64) { + b.dirty.Set(uint(BlockIdx(offset, b.blockSize))) +} + func (b *DiffMetadataBuilder) Process(ctx context.Context, block []byte, out io.Writer, offset int64) error { blockIdx := BlockIdx(offset, b.blockSize)