Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 87 additions & 45 deletions packages/orchestrator/pkg/sandbox/block/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math"
"math/rand"
"os"
"slices"
"sync"
"sync/atomic"
"syscall"
Expand All @@ -17,9 +16,13 @@ import (
"github.com/bits-and-blooms/bitset"
"github.com/edsrzf/mmap-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
"golang.org/x/sys/unix"

"github.com/e2b-dev/infra/packages/shared/pkg/logger"
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
)

const (
Expand Down Expand Up @@ -82,7 +85,7 @@ func NewCache(size, blockSize int64, filePath string, dirtyFile bool) (*Cache, e
return nil, fmt.Errorf("size too big: %d > %d", size, math.MaxInt)
}

mm, err := mmap.MapRegion(f, int(size), unix.PROT_READ|unix.PROT_WRITE, 0, 0)
mm, err := mmap.MapRegion(f, int(size), mmap.RDWR, 0, 0)
if err != nil {
return nil, fmt.Errorf("error mapping file: %w", err)
}
Expand All @@ -100,27 +103,7 @@ func (c *Cache) isClosed() bool {
return c.closed.Load()
}

func (c *Cache) Sync() error {
c.mu.Lock()
defer c.mu.Unlock()

if c.isClosed() {
return NewErrCacheClosed(c.filePath)
}

if c.mmap == nil {
return nil
}

err := c.mmap.Flush()
if err != nil {
return fmt.Errorf("error syncing cache: %w", err)
}

return nil
}

func (c *Cache) ExportToDiff(ctx context.Context, out io.Writer) (*header.DiffMetadata, error) {
func (c *Cache) ExportToDiff(ctx context.Context, out *os.File) (*header.DiffMetadata, error) {
ctx, childSpan := tracer.Start(ctx, "export-to-diff")
defer childSpan.End()

Expand All @@ -139,23 +122,96 @@ func (c *Cache) ExportToDiff(ctx context.Context, out io.Writer) (*header.DiffMe
}, nil
}

err := c.mmap.Flush()
f, err := os.Open(c.filePath)
if err != nil {
return nil, fmt.Errorf("error opening file: %w", err)
}
defer f.Close()

src := int(f.Fd())

// Explicit mmap flush is not necessary, because the kernel will handle that as part of the copy_file_range syscall.
// Calling sync_file_range marks the range for writeback and starts it early.
// This is just an optimization, so if it fails just log a warning and let copy_file_range do the actual work.
err = unix.SyncFileRange(src, 0, c.size, unix.SYNC_FILE_RANGE_WRITE)
if err != nil {
return nil, fmt.Errorf("error flushing mmap: %w", err)
logger.L().Warn(ctx, "error syncing file", zap.Error(err))
}

buildStart := time.Now()
builder := header.NewDiffMetadataBuilder(c.size, c.blockSize)

for _, offset := range c.dirtySortedKeys() {
block := (*c.mmap)[offset : offset+c.blockSize]
// We don't need to sort the keys as the bitset handles the ordering.
c.dirty.Range(func(key, _ any) bool {
builder.AddDirtyOffset(key.(int64))

err := builder.Process(ctx, block, out, offset)
if err != nil {
return nil, fmt.Errorf("error processing block %d: %w", offset, err)
return true
})

diffMetadata := builder.Build()
telemetry.SetAttributes(ctx, attribute.Int64("build_metadata_ms", time.Since(buildStart).Milliseconds()))

dst := int(out.Fd())
var writeOffset int64
var totalRanges int64
fallback := false

copyStart := time.Now()
for r := range BitsetRanges(diffMetadata.Dirty, diffMetadata.BlockSize) {
totalRanges++
remaining := int(r.Size)
readOffset := r.Start

// The kernel may return short writes (e.g. capped at MAX_RW_COUNT on non-reflink filesystems),
// so we loop until the full range is copied. The offset pointers are advanced by the kernel.
for remaining > 0 {
if !fallback {
// On XFS this uses reflink automatically.
n, err := unix.CopyFileRange(
src,
&readOffset,
dst,
&writeOffset,
remaining,
0,
)
switch {
case errors.Is(err, syscall.EXDEV) || errors.Is(err, syscall.EOPNOTSUPP) || errors.Is(err, syscall.ENOSYS):
fallback = true
logger.L().Warn(ctx, "copy_file_range unsupported, falling back to normal copy", zap.Error(err))
case err != nil:
return nil, fmt.Errorf("error copying file range: %w", err)
case n == 0:
return nil, fmt.Errorf("copy_file_range returned 0 with %d bytes remaining", remaining)
default:
remaining -= n
}
}

// CopyFileRange failed. Falling back to normal copy
if fallback && remaining > 0 {
if _, err := out.Seek(writeOffset, io.SeekStart); err != nil {
return nil, fmt.Errorf("error seeking: %w", err)
}
sr := io.NewSectionReader(f, readOffset, int64(remaining))
if _, err := io.Copy(out, sr); err != nil {
return nil, fmt.Errorf("error copying file range. %w", err)
}

writeOffset += int64(remaining)
remaining = 0
}
}
}

return builder.Build(), nil
telemetry.SetAttributes(ctx,
attribute.Int64("copy_ms", time.Since(copyStart).Milliseconds()),
attribute.Int64("total_size_bytes", c.size),
attribute.Int64("dirty_size_bytes", int64(diffMetadata.Dirty.Count())*c.blockSize),
attribute.Int64("total_ranges", totalRanges),
)

return diffMetadata, nil
}

func (c *Cache) ReadAt(b []byte, off int64) (int, error) {
Expand Down Expand Up @@ -291,20 +347,6 @@ func (c *Cache) WriteAtWithoutLock(b []byte, off int64) (int, error) {
return n, nil
}

// dirtySortedKeys returns a sorted list of dirty keys.
// Key represents a block offset.
func (c *Cache) dirtySortedKeys() []int64 {
var keys []int64
c.dirty.Range(func(key, _ any) bool {
keys = append(keys, key.(int64))

return true
})
slices.Sort(keys)

return keys
}

// FileSize returns the size of the cache on disk.
// The size might differ from the dirty size, as it may not be fully on disk.
func (c *Cache) FileSize() (int64, error) {
Expand Down
6 changes: 3 additions & 3 deletions packages/orchestrator/pkg/sandbox/diffcreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ package sandbox

import (
"context"
"io"
"os"

"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/rootfs"
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
)

type DiffCreator interface {
process(ctx context.Context, out io.Writer) (*header.DiffMetadata, error)
process(ctx context.Context, out *os.File) (*header.DiffMetadata, error)
}

type RootfsDiffCreator struct {
rootfs rootfs.Provider
closeHook func(context.Context) error
}

func (r *RootfsDiffCreator) process(ctx context.Context, out io.Writer) (*header.DiffMetadata, error) {
func (r *RootfsDiffCreator) process(ctx context.Context, out *os.File) (*header.DiffMetadata, error) {
return r.rootfs.ExportDiff(ctx, out, r.closeHook)
}
2 changes: 1 addition & 1 deletion packages/orchestrator/pkg/sandbox/rootfs/direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (o *DirectProvider) Start(_ context.Context) error {

func (o *DirectProvider) ExportDiff(
ctx context.Context,
out io.Writer,
out *os.File,
stopSandbox func(context.Context) error,
) (*header.DiffMetadata, error) {
ctx, childSpan := tracer.Start(ctx, "direct-provider-export")
Expand Down
17 changes: 15 additions & 2 deletions packages/orchestrator/pkg/sandbox/rootfs/nbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"os"

"go.uber.org/zap"
Expand Down Expand Up @@ -71,7 +70,7 @@ func (o *NBDProvider) Start(ctx context.Context) error {

func (o *NBDProvider) ExportDiff(
ctx context.Context,
out io.Writer,
out *os.File,
closeSandbox func(ctx context.Context) error,
) (*header.DiffMetadata, error) {
ctx, span := tracer.Start(ctx, "cow-export")
Expand All @@ -93,12 +92,26 @@ func (o *NBDProvider) ExportDiff(
select {
case <-o.finishedOperations:
case <-ctx.Done():
// Close the cache to avoid leaking the mmaped memory. Log an error
// if that failed
closeErr := cache.Close()
if closeErr != nil {
logger.L().Warn(ctx, "error closing cache", zap.Error(closeErr))
}

return nil, fmt.Errorf("timeout waiting for overlay device to be released")
}
telemetry.ReportEvent(ctx, "sandbox stopped")

m, err := cache.ExportToDiff(ctx, out)
if err != nil {
// Close the cache to avoid leaking the mmaped memory. Log an error
// if that failed
closeErr := cache.Close()
if closeErr != nil {
logger.L().Warn(ctx, "error closing cache", zap.Error(closeErr))
}

return nil, fmt.Errorf("error exporting cache: %w", err)
}

Expand Down
3 changes: 1 addition & 2 deletions packages/orchestrator/pkg/sandbox/rootfs/rootfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rootfs
import (
"context"
"fmt"
"io"
"os"
"syscall"

Expand All @@ -22,7 +21,7 @@ type Provider interface {
Start(ctx context.Context) error
Close(ctx context.Context) error
Path() (string, error)
ExportDiff(ctx context.Context, out io.Writer, closeSandbox func(context.Context) error) (*header.DiffMetadata, error)
ExportDiff(ctx context.Context, out *os.File, closeSandbox func(context.Context) error) (*header.DiffMetadata, error)
}

// flush flushes the data to the operating system's buffer.
Expand Down
2 changes: 1 addition & 1 deletion packages/orchestrator/pkg/sandbox/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ func pauseProcessRootfs(
return nil, nil, fmt.Errorf("failed to create rootfs diff: %w", err)
}

rootfsDiffMetadata, err := diffCreator.process(ctx, rootfsDiffFile)
rootfsDiffMetadata, err := diffCreator.process(ctx, rootfsDiffFile.File)
if err != nil {
err = errors.Join(err, rootfsDiffFile.Close())

Expand Down
5 changes: 5 additions & 0 deletions packages/shared/pkg/storage/header/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,18 @@ type DiffMetadataBuilder struct {

func NewDiffMetadataBuilder(size, blockSize int64) *DiffMetadataBuilder {
return &DiffMetadataBuilder{
// TODO: We might be able to start with 0 as preallocating here actually takes space.
dirty: bitset.New(uint(TotalBlocks(size, blockSize))),
empty: bitset.New(0),

blockSize: blockSize,
}
}

func (b *DiffMetadataBuilder) AddDirtyOffset(offset int64) {
b.dirty.Set(uint(BlockIdx(offset, b.blockSize)))
}

func (b *DiffMetadataBuilder) Process(ctx context.Context, block []byte, out io.Writer, offset int64) error {
blockIdx := BlockIdx(offset, b.blockSize)

Expand Down
Loading