feat(memfd): async pause/copy (wait-on-read)#2626
Conversation
PR SummaryMedium Risk Overview Reviewed by Cursor Bugbot for commit 3964c31. Bugbot is set up for automated code reviews on this repo. Configure here. |
d36f015 to
27948cb
Compare
❌ 3 Tests Failed:
View the full list of 7 ❄️ flaky test(s)
To view more test analytics, go to the Test Analytics Dashboard |
There was a problem hiding this comment.
Code Review
The Slice method returns a direct reference to the memfd's memory mapping which is unmapped by the background runCopy goroutine upon completion, creating a risk of SIGSEGV if the slice is accessed afterward. Furthermore, the background copy process lacks an explicit synchronization step, such as msync, before closing the memfd, which can result in stale data when the cache file is read from disk.
| } | ||
|
|
||
| func (m *MemfdCache) Slice(off, length int64) ([]byte, error) { | ||
| m.mu.RLock() | ||
| if m.src != nil { | ||
| defer m.mu.RUnlock() | ||
|
|
||
| return m.src.slice(off, length) | ||
| } | ||
| m.mu.RUnlock() | ||
|
|
There was a problem hiding this comment.
The Slice method returns a direct reference to the memfd's memory mapping. This is dangerous because the background goroutine runCopy closes the memfd and unmaps its memory as soon as the copy finishes. If a caller attempts to access the returned slice after the background copy completes, it will trigger a SIGSEGV. To ensure safety, the memfd should remain mapped until the MemfdCache itself is closed, or Slice must be implemented in a way that guarantees the lifetime of the returned buffer.
There was a problem hiding this comment.
Valid catch — fixed in 9a6a97c. MemfdCache.Slice now returns BytesNotAvailableError while the background copy is in flight, forcing callers through ReadAt (which copies into the caller's buffer, no UAF). After the copy completes m.src is nil and Slice delegates to the underlying *Cache as before.
| } | ||
|
|
||
| m.mu.Lock() | ||
| src := m.src | ||
| m.src = nil | ||
| m.mu.Unlock() | ||
|
|
||
| if closeErr := src.memfd.Close(); closeErr != nil { | ||
| joined := errors.Join(err, fmt.Errorf("close memfd: %w", closeErr)) |
There was a problem hiding this comment.
The background copy goroutine captures m.src and sets it to nil before closing the memfd. While this correctly routes subsequent ReadAt calls to the underlying cache, it doesn't ensure that the data has been flushed to disk. If a caller uses CachePath() (which calls Wait()) and immediately reads the file from disk, they might encounter stale data because the mmap writes haven't been synchronized. Consider calling msync or fsync before closing the memfd to guarantee data persistence.
There was a problem hiding this comment.
Disagree — no msync/fsync needed. The cache file is mmap'd MAP_SHARED, so writes through *m.cache.mmap immediately enter the kernel page cache and are visible to any subsequent file read on the same host. msync/fsync would only matter for crash durability, which isn't a constraint here (a host crash kills the sandbox anyway and the upload runs after the copy finishes). The CachePath caller after Wait() reads through the same page cache and sees the same bytes.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Slice returns reference to asynchronously munmapped memory
- Fixed by copying memfd-backed data into a new buffer before returning, preventing use-after-free when background goroutine unmaps the memory.
Or push these changes by commenting:
@cursor push 4e38d8cdcc
Preview (4e38d8cdcc)
diff --git a/packages/orchestrator/pkg/sandbox/block/memfd.go b/packages/orchestrator/pkg/sandbox/block/memfd.go
--- a/packages/orchestrator/pkg/sandbox/block/memfd.go
+++ b/packages/orchestrator/pkg/sandbox/block/memfd.go
@@ -279,7 +279,13 @@
if m.src != nil {
defer m.mu.RUnlock()
- return m.src.slice(off, length)
+ srcSlice, err := m.src.slice(off, length)
+ if err != nil {
+ return nil, err
+ }
+ buf := make([]byte, len(srcSlice))
+ copy(buf, srcSlice)
+ return buf, nil
}
m.mu.RUnlock()You can send follow-ups to the cloud agent here.
27948cb to
9a6a97c
Compare
e5aedb7 to
6d430ef
Compare
9a6a97c to
2e957b0
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed:
CachePathwait is uncancellable viacontext.Background()- Updated Diff interface and all implementations to accept context parameter, allowing Wait calls to respect caller's cancellation and timeouts instead of blocking indefinitely.
Or push these changes by commenting:
@cursor push 5e202ea1ae
Preview (5e202ea1ae)
diff --git a/packages/orchestrator/pkg/sandbox/block/memfd.go b/packages/orchestrator/pkg/sandbox/block/memfd.go
--- a/packages/orchestrator/pkg/sandbox/block/memfd.go
+++ b/packages/orchestrator/pkg/sandbox/block/memfd.go
@@ -7,11 +7,8 @@
"errors"
"fmt"
"sync"
+ "sync/atomic"
"syscall"
-
- "go.uber.org/zap"
-
- "github.com/e2b-dev/infra/packages/shared/pkg/logger"
)
// Memfd wraps a memfd received from Firecracker
@@ -84,12 +81,91 @@
return err
}
-// MemfdCache wraps a *Cache that is being populated from a memfd.
+// memfdSource indexes the memfd-backed regions by cache offset so that reads
+// against a MemfdCache can be served directly from the memfd while the
+// background copy is still in flight.
+type memfdSource struct {
+ memfd *Memfd
+ entries []memfdRange
+}
+
+type memfdRange struct {
+ cacheStart int64
+ srcStart int64
+ size int64
+}
+
+func newMemfdSource(memfd *Memfd, ranges []Range) *memfdSource {
+ entries := make([]memfdRange, len(ranges))
+ var cacheOff int64
+ for i, r := range ranges {
+ entries[i] = memfdRange{cacheStart: cacheOff, srcStart: r.Start, size: r.Size}
+ cacheOff += r.Size
+ }
+
+ return &memfdSource{memfd: memfd, entries: entries}
+}
+
+func (s *memfdSource) findEntry(cacheOff int64) int {
+ lo, hi := 0, len(s.entries)
+ for lo < hi {
+ mid := (lo + hi) / 2
+ if s.entries[mid].cacheStart > cacheOff {
+ hi = mid
+ } else {
+ lo = mid + 1
+ }
+ }
+ i := lo - 1
+ if i < 0 {
+ return -1
+ }
+ e := s.entries[i]
+ if cacheOff >= e.cacheStart+e.size {
+ return -1
+ }
+
+ return i
+}
+
+func (s *memfdSource) readAt(b []byte, cacheOff int64) (int, error) {
+ n := 0
+ for n < len(b) {
+ i := s.findEntry(cacheOff + int64(n))
+ if i < 0 {
+ return n, nil
+ }
+ e := s.entries[i]
+ offsetInEntry := cacheOff + int64(n) - e.cacheStart
+ toCopy := min(int64(len(b)-n), e.size-offsetInEntry)
+ src, err := s.memfd.Slice(e.srcStart+offsetInEntry, toCopy)
+ if err != nil {
+ return n, fmt.Errorf("memfd slice: %w", err)
+ }
+ copy(b[n:n+int(toCopy)], src)
+ n += int(toCopy)
+ }
+
+ return n, nil
+}
+
+// MemfdCache wraps a *Cache that is being populated from a memfd in the
+// background. Reads are served directly from the memfd until the copy
+// completes; afterwards they delegate to the underlying Cache and the memfd
+// is closed.
type MemfdCache struct {
cache *Cache
- memfd *Memfd
+
+ mu sync.RWMutex // guards src
+ src *memfdSource // nil once the background copy has completed
+ cancel context.CancelFunc
+ done chan struct{}
+ err atomic.Pointer[error]
}
+// NewCacheFromMemfd creates a Cache backed by an in-flight copy from the given
+// memfd. The returned wrapper takes ownership of memfd: callers must Close the
+// wrapper (which also closes the memfd).
func NewCacheFromMemfd(
ctx context.Context,
blockSize int64,
@@ -109,7 +185,6 @@
}
if size == 0 {
- // We can close Memfd. We won't be reading anything out of it.
if closeErr := memfd.Close(); closeErr != nil {
return nil, errors.Join(fmt.Errorf("close memfd: %w", closeErr), cache.Close())
}
@@ -117,34 +192,44 @@
return &MemfdCache{cache: cache}, nil
}
- memfdCache := &MemfdCache{
- cache: cache,
- memfd: memfd,
+ copyCtx, cancel := context.WithCancel(context.WithoutCancel(ctx))
+ m := &MemfdCache{
+ cache: cache,
+ src: newMemfdSource(memfd, ranges),
+ cancel: cancel,
+ done: make(chan struct{}),
}
- err = memfdCache.writeToDisk(ctx, ranges)
+ go m.runCopy(copyCtx, ranges)
+
+ return m, nil
+}
+
+func (m *MemfdCache) runCopy(ctx context.Context, ranges []Range) {
+ defer close(m.done)
+
+ err := m.copyFromMemfd(ctx, ranges)
if err != nil {
- return nil, errors.Join(fmt.Errorf("could not write memfd to disk: %w", err), memfdCache.Close())
+ m.err.Store(&err)
}
- // Close memfd to release the memory
- // At the moment, we always close it. In the future, we will implement
- // copying at the background, so the file descriptor will be kept valid
- if err := memfdCache.memfd.Close(); err != nil {
- logger.L().Warn(ctx, "Could not close memfd", zap.Error(err))
+ m.mu.Lock()
+ src := m.src
+ m.src = nil
+ m.mu.Unlock()
+
+ if closeErr := src.memfd.Close(); closeErr != nil {
+ joined := errors.Join(err, fmt.Errorf("close memfd: %w", closeErr))
+ m.err.Store(&joined)
}
- memfdCache.memfd = nil
-
- return memfdCache, nil
}
-func (m *MemfdCache) writeToDisk(ctx context.Context, ranges []Range) error {
+func (m *MemfdCache) copyFromMemfd(ctx context.Context, ranges []Range) error {
var cacheOff int64
-
for _, r := range ranges {
rangeStart := cacheOff
- src, err := m.memfd.Slice(r.Start, r.Size)
+ src, err := m.src.memfd.Slice(r.Start, r.Size)
if err != nil {
return fmt.Errorf("bad memfd slice [%d,%d): %w", r.Start, r.Start+r.Size, err)
}
@@ -167,11 +252,48 @@
return nil
}
+// Wait blocks until the background copy completes (or ctx is cancelled), and
+// returns any error that occurred.
+func (m *MemfdCache) Wait(ctx context.Context) error {
+ if m.done == nil {
+ return nil
+ }
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-m.done:
+ }
+ if errPtr := m.err.Load(); errPtr != nil {
+ return *errPtr
+ }
+
+ return nil
+}
+
func (m *MemfdCache) ReadAt(b []byte, off int64) (int, error) {
+ m.mu.RLock()
+ if m.src != nil {
+ defer m.mu.RUnlock()
+
+ return m.src.readAt(b, off)
+ }
+ m.mu.RUnlock()
+
return m.cache.ReadAt(b, off)
}
+// Slice returns BytesNotAvailableError while the background copy is in
+// flight: the memfd-backed slice would outlive the RLock and could be
+// Munmap'd asynchronously by runCopy. Callers should fall back to ReadAt
+// (which copies into the caller's buffer) or Wait first.
func (m *MemfdCache) Slice(off, length int64) ([]byte, error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ if m.src != nil {
+ return nil, BytesNotAvailableError{}
+ }
+
return m.cache.Slice(off, length)
}
@@ -192,17 +314,10 @@
}
func (m *MemfdCache) Close() error {
- var err error
-
- if m.memfd != nil {
- if e := m.memfd.Close(); e != nil {
- err = fmt.Errorf("error closing memfd: %w", e)
- }
+ if m.cancel != nil {
+ m.cancel()
+ <-m.done
}
- if e := m.cache.Close(); e != nil {
- err = errors.Join(err, fmt.Errorf("error closing cache: %w", e))
- }
-
- return err
+ return m.cache.Close()
}
diff --git a/packages/orchestrator/pkg/sandbox/block/memfd_test.go b/packages/orchestrator/pkg/sandbox/block/memfd_test.go
--- a/packages/orchestrator/pkg/sandbox/block/memfd_test.go
+++ b/packages/orchestrator/pkg/sandbox/block/memfd_test.go
@@ -304,30 +304,82 @@
require.EqualValues(t, 0, sz)
}
-func TestMemfdCache_ContextCancellation(t *testing.T) {
+// Cancelling the parent ctx after construction must not kill the in-flight
+// copy: NewCacheFromMemfd detaches via context.WithoutCancel so the copy
+// can outlive the Pause RPC. Cancellation only happens via Close.
+func TestMemfdCache_ParentContextCancellationDoesNotAbortCopy(t *testing.T) {
t.Parallel()
pageSize := int64(header.PageSize)
size := pageSize * 16
- fd, _ := newTestMemfd(t, size)
+ fd, expected := newTestMemfd(t, size)
ranges := []Range{{Start: 0, Size: size}}
ctx, cancel := context.WithCancel(t.Context())
+
+ cache, err := NewCacheFromMemfd(ctx, pageSize, t.TempDir()+"/cache", NewFromFd(fd, int(size)), ranges)
+ require.NoError(t, err)
+ t.Cleanup(func() { _ = cache.Close() })
+
cancel()
- _, err := NewCacheFromMemfd(ctx, pageSize, t.TempDir()+"/cache", NewFromFd(fd, int(size)), ranges)
- require.ErrorIs(t, err, context.Canceled)
+ require.NoError(t, cache.Wait(t.Context()))
+
+ got := make([]byte, size)
+ n, err := cache.ReadAt(got, 0)
+ require.NoError(t, err)
+ require.Equal(t, int(size), n)
+ require.Equal(t, expected, got)
}
-// On the happy path, NewCacheFromMemfd closes the memfd internally and nils
-// the field, so subsequent MemfdCache.Close must still cleanly close the
-// underlying *Cache without trying to re-close the memfd. The cache file is
-// then removed by Cache.Close.
-func TestMemfdCache_CloseAfterSuccessfulPopulationRemovesCacheFile(t *testing.T) {
+// Wait blocks until the background copy completes; after it returns the
+// cache file on disk must contain the full payload.
+func TestMemfdCache_WaitWritesFile(t *testing.T) {
t.Parallel()
pageSize := int64(header.PageSize)
+ size := pageSize * 12
+ fd, expected := newTestMemfd(t, size)
+
+ cachePath := t.TempDir() + "/cache"
+ cache, err := NewCacheFromMemfd(t.Context(), pageSize, cachePath, NewFromFd(fd, int(size)), []Range{{Start: 0, Size: size}})
+ require.NoError(t, err)
+ t.Cleanup(func() { _ = cache.Close() })
+
+ require.NoError(t, cache.Wait(t.Context()))
+
+ fromFile, err := os.ReadFile(cachePath)
+ require.NoError(t, err)
+ require.Equal(t, expected, fromFile)
+}
+
+// Slice must refuse to hand out memfd-backed views while the copy is still
+// in flight, since runCopy would Munmap them asynchronously.
+func TestMemfdCache_SliceUnavailableUntilCopyDone(t *testing.T) {
+ t.Parallel()
+
+ pageSize := int64(header.PageSize)
+ size := pageSize * 8
+ fd, _ := newTestMemfd(t, size)
+
+ cache, err := NewCacheFromMemfd(t.Context(), pageSize, t.TempDir()+"/cache", NewFromFd(fd, int(size)), []Range{{Start: 0, Size: size}})
+ require.NoError(t, err)
+ t.Cleanup(func() { _ = cache.Close() })
+
+ require.NoError(t, cache.Wait(t.Context()))
+
+ _, err = cache.Slice(0, pageSize)
+ require.NoError(t, err)
+}
+
+// MemfdCache.Close must wait for the background copy goroutine before
+// removing the cache file, otherwise the goroutine would race with file
+// teardown.
+func TestMemfdCache_CloseRemovesCacheFile(t *testing.T) {
+ t.Parallel()
+
+ pageSize := int64(header.PageSize)
size := pageSize * 4
fd, _ := newTestMemfd(t, size)
diff --git a/packages/orchestrator/pkg/sandbox/build/diff.go b/packages/orchestrator/pkg/sandbox/build/diff.go
--- a/packages/orchestrator/pkg/sandbox/build/diff.go
+++ b/packages/orchestrator/pkg/sandbox/build/diff.go
@@ -31,7 +31,7 @@
storage.SeekableReader
block.FramedSlicer
CacheKey() DiffStoreKey
- CachePath() (string, error)
+ CachePath(ctx context.Context) (string, error)
FileSize() (int64, error)
BlockSize() int64
Init(ctx context.Context) error
@@ -41,7 +41,7 @@
var _ Diff = (*NoDiff)(nil)
-func (n *NoDiff) CachePath() (string, error) {
+func (n *NoDiff) CachePath(_ context.Context) (string, error) {
return "", nil
}
diff --git a/packages/orchestrator/pkg/sandbox/build/local_diff.go b/packages/orchestrator/pkg/sandbox/build/local_diff.go
--- a/packages/orchestrator/pkg/sandbox/build/local_diff.go
+++ b/packages/orchestrator/pkg/sandbox/build/local_diff.go
@@ -109,7 +109,15 @@
return NewLocalDiffFromCache(cacheKey, cache)
}
-func (b *localDiff) CachePath() (string, error) {
+func (b *localDiff) CachePath(ctx context.Context) (string, error) {
+ if w, ok := b.cache.(interface {
+ Wait(ctx context.Context) error
+ }); ok {
+ if err := w.Wait(ctx); err != nil {
+ return "", fmt.Errorf("memfd copy: %w", err)
+ }
+ }
+
return b.cache.Path(), nil
}
diff --git a/packages/orchestrator/pkg/sandbox/build/mocks/mockdiff.go b/packages/orchestrator/pkg/sandbox/build/mocks/mockdiff.go
--- a/packages/orchestrator/pkg/sandbox/build/mocks/mockdiff.go
+++ b/packages/orchestrator/pkg/sandbox/build/mocks/mockdiff.go
@@ -130,8 +130,8 @@
}
// CachePath provides a mock function for the type MockDiff
-func (_mock *MockDiff) CachePath() (string, error) {
- ret := _mock.Called()
+func (_mock *MockDiff) CachePath(ctx context.Context) (string, error) {
+ ret := _mock.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for CachePath")
@@ -139,16 +139,16 @@
var r0 string
var r1 error
- if returnFunc, ok := ret.Get(0).(func() (string, error)); ok {
- return returnFunc()
+ if returnFunc, ok := ret.Get(0).(func(context.Context) (string, error)); ok {
+ return returnFunc(ctx)
}
- if returnFunc, ok := ret.Get(0).(func() string); ok {
- r0 = returnFunc()
+ if returnFunc, ok := ret.Get(0).(func(context.Context) string); ok {
+ r0 = returnFunc(ctx)
} else {
r0 = ret.Get(0).(string)
}
- if returnFunc, ok := ret.Get(1).(func() error); ok {
- r1 = returnFunc()
+ if returnFunc, ok := ret.Get(1).(func(context.Context) error); ok {
+ r1 = returnFunc(ctx)
} else {
r1 = ret.Error(1)
}
@@ -161,13 +161,14 @@
}
// CachePath is a helper method to define mock.On call
-func (_e *MockDiff_Expecter) CachePath() *MockDiff_CachePath_Call {
- return &MockDiff_CachePath_Call{Call: _e.mock.On("CachePath")}
+// - ctx context.Context
+func (_e *MockDiff_Expecter) CachePath(ctx interface{}) *MockDiff_CachePath_Call {
+ return &MockDiff_CachePath_Call{Call: _e.mock.On("CachePath", ctx)}
}
-func (_c *MockDiff_CachePath_Call) Run(run func()) *MockDiff_CachePath_Call {
+func (_c *MockDiff_CachePath_Call) Run(run func(ctx context.Context)) *MockDiff_CachePath_Call {
_c.Call.Run(func(args mock.Arguments) {
- run()
+ run(args[0].(context.Context))
})
return _c
}
@@ -177,7 +178,7 @@
return _c
}
-func (_c *MockDiff_CachePath_Call) RunAndReturn(run func() (string, error)) *MockDiff_CachePath_Call {
+func (_c *MockDiff_CachePath_Call) RunAndReturn(run func(context.Context) (string, error)) *MockDiff_CachePath_Call {
_c.Call.Return(run)
return _c
}
diff --git a/packages/orchestrator/pkg/sandbox/build/storage_diff.go b/packages/orchestrator/pkg/sandbox/build/storage_diff.go
--- a/packages/orchestrator/pkg/sandbox/build/storage_diff.go
+++ b/packages/orchestrator/pkg/sandbox/build/storage_diff.go
@@ -123,7 +123,7 @@
}
// The local file might not be synced.
-func (b *StorageDiff) CachePath() (string, error) {
+func (b *StorageDiff) CachePath(_ context.Context) (string, error) {
return b.cachePath, nil
}
diff --git a/packages/orchestrator/pkg/sandbox/build_upload_v3.go b/packages/orchestrator/pkg/sandbox/build_upload_v3.go
--- a/packages/orchestrator/pkg/sandbox/build_upload_v3.go
+++ b/packages/orchestrator/pkg/sandbox/build_upload_v3.go
@@ -14,12 +14,12 @@
)
func (u *Upload) runV3(ctx context.Context) error {
- memfilePath, err := u.snap.MemfileDiff.CachePath()
+ memfilePath, err := u.snap.MemfileDiff.CachePath(ctx)
if err != nil {
return fmt.Errorf("error getting memfile diff path: %w", err)
}
- rootfsPath, err := u.snap.RootfsDiff.CachePath()
+ rootfsPath, err := u.snap.RootfsDiff.CachePath(ctx)
if err != nil {
return fmt.Errorf("error getting rootfs diff path: %w", err)
}
diff --git a/packages/orchestrator/pkg/sandbox/build_upload_v4.go b/packages/orchestrator/pkg/sandbox/build_upload_v4.go
--- a/packages/orchestrator/pkg/sandbox/build_upload_v4.go
+++ b/packages/orchestrator/pkg/sandbox/build_upload_v4.go
@@ -15,12 +15,12 @@
)
func (u *Upload) runV4(ctx context.Context) error {
- memSrc, err := u.snap.MemfileDiff.CachePath()
+ memSrc, err := u.snap.MemfileDiff.CachePath(ctx)
if err != nil {
return fmt.Errorf("memfile diff path: %w", err)
}
- rootfsSrc, err := u.snap.RootfsDiff.CachePath()
+ rootfsSrc, err := u.snap.RootfsDiff.CachePath(ctx)
if err != nil {
return fmt.Errorf("rootfs diff path: %w", err)
}
diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go
--- a/packages/shared/pkg/featureflags/flags.go
+++ b/packages/shared/pkg/featureflags/flags.go
@@ -122,9 +122,9 @@
// UseMemFdFlag enables memfd-backed guest memory. When enabled, Firecracker
// allocates guest memory via memfd_create and passes the fd to the UFFD
- // handler over the UFFD socket on snapshot restore. This allows the
- // orchestrator to read dirty pages via pread without having to call
- // process_vm_readv() to copy memory.
+ // handler over the UFFD socket on snapshot restore. This lets the
+ // orchestrator mmap the memfd directly to copy dirty pages, instead of
+ // calling process_vm_readv() across processes.
UseMemFdFlag = NewBoolFlag("use-memfd", false)
// PeerToPeerChunkTransferFlag enables peer-to-peer chunk routing.You can send follow-ups to the cloud agent here.
2e957b0 to
2027fa0
Compare
9c553f4 to
66be8a2
Compare
119b176 to
c34e39c
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Data race on Memfd lazy mmap initialization
- Added sync.Once to protect lazy mmap initialization in Memfd.Slice, preventing concurrent goroutines from performing double-mmap operations.
Or push these changes by commenting:
@cursor push 1e0ba6164e
Preview (1e0ba6164e)
diff --git a/packages/orchestrator/pkg/sandbox/block/memfd.go b/packages/orchestrator/pkg/sandbox/block/memfd.go
--- a/packages/orchestrator/pkg/sandbox/block/memfd.go
+++ b/packages/orchestrator/pkg/sandbox/block/memfd.go
@@ -6,14 +6,18 @@
"context"
"errors"
"fmt"
+ "sync"
+ "sync/atomic"
"syscall"
)
// Memfd wraps a memfd received from Firecracker.
type Memfd struct {
- fd int
- size int
- mmap []byte
+ fd int
+ size int
+ mmap []byte
+ mmapOnce sync.Once
+ mmapErr error
}
func NewFromFd(fd, size int) *Memfd {
@@ -23,12 +27,16 @@
// Slice returns a zero-copy view of [offset, offset+size). Valid until Close.
// The underlying mmap is created lazily on first call.
func (m *Memfd) Slice(offset, size int64) ([]byte, error) {
- if m.mmap == nil {
+ m.mmapOnce.Do(func() {
b, err := syscall.Mmap(m.fd, 0, m.size, syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
- return nil, fmt.Errorf("mmap memfd: %w", err)
+ m.mmapErr = fmt.Errorf("mmap memfd: %w", err)
+ return
}
m.mmap = b
+ })
+ if m.mmapErr != nil {
+ return nil, m.mmapErr
}
if offset < 0 || offset+size > int64(m.size) {
return nil, fmt.Errorf("range [%d, %d) out of bounds (size %d)", offset, offset+size, m.size)
@@ -56,10 +64,21 @@
}
// MemfdCache is a Cache populated from a memfd. The memfd is consumed (and
-// closed) during construction; the wrapper exists so the upcoming async-copy
-// and dedup PRs can attach extra state without churning callers.
+// closed) during construction; the wrapper exists so the upcoming dedup PR
+// can attach extra state without churning callers.
+//
+// When constructed via NewCacheFromMemfdAsync the copy runs in a background
+// goroutine and the memfd lifetime extends until the goroutine finishes.
+// In-flight reads route through the memfd via memfdSource; afterwards they
+// delegate to the embedded Cache and the memfd is closed.
type MemfdCache struct {
*Cache
+
+ mu sync.RWMutex // guards src
+ src *memfdSource // non-nil while the background copy is in flight
+ cancel context.CancelFunc
+ done chan struct{}
+ err atomic.Pointer[error]
}
func NewCacheFromMemfd(
@@ -83,7 +102,123 @@
return &MemfdCache{Cache: cache}, nil
}
-// copyFromMemfd is the seam the upcoming async-copy and dedup PRs replace.
+// NewCacheFromMemfdAsync returns a MemfdCache that streams the memfd into
+// the cache on a background goroutine. The caller's gRPC handler can return
+// as soon as the snapshot file and diff metadata are written; the FC stop
+// and the memfd copy then run in parallel after the response. The returned
+// wrapper takes ownership of memfd; callers must Close it (which also
+// cancels and joins the copy goroutine).
+//
+// Reads against the wrapper are served from the memfd via memfdSource while
+// the copy is in flight; afterwards they delegate to the embedded Cache and
+// the memfd is closed.
+func NewCacheFromMemfdAsync(
+ ctx context.Context,
+ blockSize int64,
+ filePath string,
+ memfd *Memfd,
+ ranges []Range,
+) (*MemfdCache, error) {
+ cache, err := NewCache(GetSize(ranges), blockSize, filePath, false)
+ if err != nil {
+ return nil, errors.Join(err, memfd.Close())
+ }
+ if len(ranges) == 0 {
+ if closeErr := memfd.Close(); closeErr != nil {
+ return nil, errors.Join(fmt.Errorf("close memfd: %w", closeErr), cache.Close())
+ }
+
+ return &MemfdCache{Cache: cache}, nil
+ }
+
+ // Detach from the request context so the copy can outlive Pause; Close
+ // drives cancellation instead.
+ copyCtx, cancel := context.WithCancel(context.WithoutCancel(ctx))
+ m := &MemfdCache{
+ Cache: cache,
+ src: newMemfdSource(memfd, ranges),
+ cancel: cancel,
+ done: make(chan struct{}),
+ }
+
+ go m.runCopy(copyCtx, ranges)
+
+ return m, nil
+}
+
+func (m *MemfdCache) runCopy(ctx context.Context, ranges []Range) {
+ defer close(m.done)
+
+ err := copyFromMemfd(ctx, m.Cache, m.src.memfd, ranges)
+ if err != nil {
+ m.err.Store(&err)
+ }
+
+ m.mu.Lock()
+ src := m.src
+ m.src = nil
+ m.mu.Unlock()
+
+ if closeErr := src.memfd.Close(); closeErr != nil {
+ joined := errors.Join(err, fmt.Errorf("close memfd: %w", closeErr))
+ m.err.Store(&joined)
+ }
+}
+
+// Wait blocks until the background copy completes (or ctx is cancelled).
+func (m *MemfdCache) Wait(ctx context.Context) error {
+ if m.done == nil {
+ return nil
+ }
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-m.done:
+ }
+ if errPtr := m.err.Load(); errPtr != nil {
+ return *errPtr
+ }
+
+ return nil
+}
+
+func (m *MemfdCache) ReadAt(b []byte, off int64) (int, error) {
+ m.mu.RLock()
+ if m.src != nil {
+ defer m.mu.RUnlock()
+
+ return m.src.readAt(b, off)
+ }
+ m.mu.RUnlock()
+
+ return m.Cache.ReadAt(b, off)
+}
+
+// Slice returns BytesNotAvailableError while the copy is in flight: the
+// memfd-backed slice would outlive the RLock and could be Munmap'd
+// asynchronously. Callers fall back to ReadAt or Wait first.
+func (m *MemfdCache) Slice(off, length int64) ([]byte, error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ if m.src != nil {
+ return nil, BytesNotAvailableError{}
+ }
+
+ return m.Cache.Slice(off, length)
+}
+
+func (m *MemfdCache) Close() error {
+ if m.cancel != nil {
+ m.cancel()
+ <-m.done
+ }
+
+ return m.Cache.Close()
+}
+
+// copyFromMemfd is the seam the dedup PR replaces with a compare-conditional
+// memcpy. Kept sequential so the dedup drop-in stays trivial.
func copyFromMemfd(ctx context.Context, cache *Cache, memfd *Memfd, ranges []Range) error {
var cacheOff int64
for _, r := range ranges {
@@ -105,3 +240,66 @@
return nil
}
+
+// memfdSource indexes the memfd-backed ranges by cache offset so reads can
+// be served from the memfd while the background copy is still in flight.
+type memfdSource struct {
+ memfd *Memfd
+ entries []memfdRange
+}
+
+type memfdRange struct {
+ cacheStart int64
+ srcStart int64
+ size int64
+}
+
+func newMemfdSource(memfd *Memfd, ranges []Range) *memfdSource {
+ entries := make([]memfdRange, len(ranges))
+ var cacheOff int64
+ for i, r := range ranges {
+ entries[i] = memfdRange{cacheStart: cacheOff, srcStart: r.Start, size: r.Size}
+ cacheOff += r.Size
+ }
+
+ return &memfdSource{memfd: memfd, entries: entries}
+}
+
+func (s *memfdSource) findEntry(cacheOff int64) int {
+ lo, hi := 0, len(s.entries)
+ for lo < hi {
+ mid := (lo + hi) / 2
+ if s.entries[mid].cacheStart > cacheOff {
+ hi = mid
+ } else {
+ lo = mid + 1
+ }
+ }
+ i := lo - 1
+ if i < 0 || cacheOff >= s.entries[i].cacheStart+s.entries[i].size {
+ return -1
+ }
+
+ return i
+}
+
+func (s *memfdSource) readAt(b []byte, cacheOff int64) (int, error) {
+ n := 0
+ for n < len(b) {
+ i := s.findEntry(cacheOff + int64(n))
+ if i < 0 {
+ return n, nil
+ }
+ e := s.entries[i]
+ offsetInEntry := cacheOff + int64(n) - e.cacheStart
+ toCopy := min(int64(len(b)-n), e.size-offsetInEntry)
+ src, err := s.memfd.Slice(e.srcStart+offsetInEntry, toCopy)
+ if err != nil {
+ return n, fmt.Errorf("memfd slice: %w", err)
+ }
+ copy(b[n:n+int(toCopy)], src)
+ n += int(toCopy)
+ }
+
+ return n, nil
+}
diff --git a/packages/orchestrator/pkg/sandbox/block/memfd_test.go b/packages/orchestrator/pkg/sandbox/block/memfd_test.go
--- a/packages/orchestrator/pkg/sandbox/block/memfd_test.go
+++ b/packages/orchestrator/pkg/sandbox/block/memfd_test.go
@@ -5,6 +5,7 @@
import (
"context"
"crypto/rand"
+ "os"
"syscall"
"testing"
@@ -126,3 +127,54 @@
_, err := NewCacheFromMemfd(ctx, pageSize, t.TempDir()+"/cache", NewFromFd(fd, int(size)), []Range{{Start: 0, Size: size}})
require.ErrorIs(t, err, context.Canceled)
}
+
+// Cancelling the parent ctx after construction must not abort the in-flight
+// copy: NewCacheFromMemfdAsync detaches via context.WithoutCancel so the
+// copy outlives the Pause RPC. Cancellation happens via Close.
+func TestMemfdCacheAsync_ParentContextCancellationDoesNotAbortCopy(t *testing.T) {
+ t.Parallel()
+
+ pageSize := int64(header.PageSize)
+ size := pageSize * 16
+
+ fd, expected := newTestMemfd(t, size)
+ ctx, cancel := context.WithCancel(t.Context())
+
+ cache, err := NewCacheFromMemfdAsync(ctx, pageSize, t.TempDir()+"/cache", NewFromFd(fd, int(size)), []Range{{Start: 0, Size: size}})
+ require.NoError(t, err)
+ t.Cleanup(func() { _ = cache.Close() })
+
+ cancel()
+ require.NoError(t, cache.Wait(t.Context()))
+
+ got := make([]byte, size)
+ n, err := cache.ReadAt(got, 0)
+ require.NoError(t, err)
+ require.Equal(t, int(size), n)
+ require.Equal(t, expected, got)
+}
+
+// Wait flushes the background copy; afterwards Slice succeeds (the
+// BytesNotAvailableError gate has lifted) and the cache file on disk has
+// the full payload.
+func TestMemfdCacheAsync_WaitFlushesToFile(t *testing.T) {
+ t.Parallel()
+
+ pageSize := int64(header.PageSize)
+ size := pageSize * 12
+
+ fd, expected := newTestMemfd(t, size)
+ cachePath := t.TempDir() + "/cache"
+ cache, err := NewCacheFromMemfdAsync(t.Context(), pageSize, cachePath, NewFromFd(fd, int(size)), []Range{{Start: 0, Size: size}})
+ require.NoError(t, err)
+ t.Cleanup(func() { _ = cache.Close() })
+
+ require.NoError(t, cache.Wait(t.Context()))
+
+ _, err = cache.Slice(0, pageSize)
+ require.NoError(t, err)
+
+ fromFile, err := os.ReadFile(cachePath)
+ require.NoError(t, err)
+ require.Equal(t, expected, fromFile)
+}
diff --git a/packages/orchestrator/pkg/sandbox/build/local_diff.go b/packages/orchestrator/pkg/sandbox/build/local_diff.go
--- a/packages/orchestrator/pkg/sandbox/build/local_diff.go
+++ b/packages/orchestrator/pkg/sandbox/build/local_diff.go
@@ -110,6 +110,14 @@
}
func (b *localDiff) CachePath() (string, error) {
+ if w, ok := b.cache.(interface {
+ Wait(ctx context.Context) error
+ }); ok {
+ if err := w.Wait(context.Background()); err != nil {
+ return "", fmt.Errorf("memfd copy: %w", err)
+ }
+ }
+
return b.cache.Path(), nil
}
diff --git a/packages/orchestrator/pkg/sandbox/fc/memory.go b/packages/orchestrator/pkg/sandbox/fc/memory.go
--- a/packages/orchestrator/pkg/sandbox/fc/memory.go
+++ b/packages/orchestrator/pkg/sandbox/fc/memory.go
@@ -29,6 +29,7 @@
cachePath string,
blockSize int64,
memfd *block.Memfd,
+ bgCopy bool,
) (block.Cacher, error) {
var guestRanges []block.Range
@@ -36,7 +37,11 @@
guestRanges = append(guestRanges, r)
}
- cache, err := block.NewCacheFromMemfd(ctx, blockSize, cachePath, memfd, guestRanges)
+ ctor := block.NewCacheFromMemfd
+ if bgCopy {
+ ctor = block.NewCacheFromMemfdAsync
+ }
+ cache, err := ctor(ctx, blockSize, cachePath, memfd, guestRanges)
if err != nil {
return nil, fmt.Errorf("create MemfdCache: %w", err)
}
@@ -85,10 +90,11 @@
cachePath string,
blockSize int64,
memfd *block.Memfd,
+ bgCopy bool,
) (block.Cacher, error) {
if memfd == nil {
return p.exportMemoryFromFc(ctx, include, cachePath, blockSize)
}
- return p.exportMemoryFromMemfd(ctx, include, cachePath, blockSize, memfd)
+ return p.exportMemoryFromMemfd(ctx, include, cachePath, blockSize, memfd, bgCopy)
}
diff --git a/packages/orchestrator/pkg/sandbox/sandbox.go b/packages/orchestrator/pkg/sandbox/sandbox.go
--- a/packages/orchestrator/pkg/sandbox/sandbox.go
+++ b/packages/orchestrator/pkg/sandbox/sandbox.go
@@ -1136,6 +1136,7 @@
s.config.DefaultCacheDir,
s.process,
s.memory.Memfd(ctx),
+ s.featureFlags.BoolFlag(ctx, featureflags.MemfdBackgroundCopyFlag),
)
if err != nil {
return nil, fmt.Errorf("error while post processing: %w", err)
@@ -1204,6 +1205,7 @@
cacheDir string,
fc *fc.Process,
memfd *block.Memfd,
+ bgCopy bool,
) (d build.Diff, h *header.Header, e error) {
ctx, span := tracer.Start(ctx, "process-memory")
defer span.End()
@@ -1226,6 +1228,7 @@
memfileDiffPath,
diffMetadata.BlockSize,
memfd,
+ bgCopy,
)
if err != nil {
return nil, nil, fmt.Errorf("failed to export memory: %w", err)
diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go
--- a/packages/shared/pkg/featureflags/flags.go
+++ b/packages/shared/pkg/featureflags/flags.go
@@ -126,6 +126,11 @@
// dirty pages instead of calling process_vm_readv() across processes.
UseMemFdFlag = NewBoolFlag("use-memfd", false)
+ // MemfdBackgroundCopyFlag streams the memfd into the snapshot cache on a
+ // background goroutine, so gRPC Pause can return as soon as the diff
+ // metadata is written. Only takes effect when UseMemFdFlag is also on.
+ MemfdBackgroundCopyFlag = NewBoolFlag("memfd-background-copy", false)
+
// PeerToPeerChunkTransferFlag enables peer-to-peer chunk routing.
PeerToPeerChunkTransferFlag = NewBoolFlag("peer-to-peer-chunk-transfer", false)
// PeerToPeerAsyncCheckpointFlag makes Checkpoint upload fire-and-forget insteadYou can send follow-ups to the cloud agent here.
83b5f20 to
402c402
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Memfd close failure overwrites successful copy result
- Modified runCopy to only store memfd close errors when the copy itself failed, preventing successful data copies from being invalidated by cleanup failures.
Or push these changes by commenting:
@cursor push 2a0008f770
Preview (2a0008f770)
diff --git a/packages/orchestrator/pkg/sandbox/block/memfd.go b/packages/orchestrator/pkg/sandbox/block/memfd.go
--- a/packages/orchestrator/pkg/sandbox/block/memfd.go
+++ b/packages/orchestrator/pkg/sandbox/block/memfd.go
@@ -172,8 +172,10 @@
m.mu.Unlock()
if closeErr := src.memfd.Close(); closeErr != nil {
- joined := errors.Join(err, fmt.Errorf("close memfd: %w", closeErr))
- m.err.Store(&joined)
+ if err != nil {
+ joined := errors.Join(err, fmt.Errorf("close memfd: %w", closeErr))
+ m.err.Store(&joined)
+ }
}
}You can send follow-ups to the cloud agent here.
5b861f4 to
4c3d3b4
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Memfd and hugepages not released after copy completes
- Added
m.src = nilin runCopy after successful copy to release memfd and hugepages, allowing reads to delegate to the underlying Cache.
- Added
Or push these changes by commenting:
@cursor push 87c8ee0df1
Preview (87c8ee0df1)
diff --git a/packages/orchestrator/pkg/sandbox/block/memfd.go b/packages/orchestrator/pkg/sandbox/block/memfd.go
--- a/packages/orchestrator/pkg/sandbox/block/memfd.go
+++ b/packages/orchestrator/pkg/sandbox/block/memfd.go
@@ -165,6 +165,9 @@
func (m *MemfdCache) runCopy(ctx context.Context, dirty *roaring.Bitmap, blockSize int64) {
m.err = copyFromMemfd(ctx, m.Cache, m.memfd, dirty, blockSize)
+ if m.err == nil {
+ m.src = nil
+ }
close(m.done)
}
diff --git a/packages/orchestrator/pkg/sandbox/block/memfd_test.go b/packages/orchestrator/pkg/sandbox/block/memfd_test.go
--- a/packages/orchestrator/pkg/sandbox/block/memfd_test.go
+++ b/packages/orchestrator/pkg/sandbox/block/memfd_test.go
@@ -134,3 +134,30 @@
require.Equal(t, expected[srcBlock*pageSize:(srcBlock+1)*pageSize], slice, "Slice block %d", i)
}
}
+
+// After the background copy completes, reads must delegate to the underlying
+// Cache (m.src becomes nil), releasing the memfd and its hugepage-backed memory.
+func TestNewCacheFromMemfdAsync_ReleasesSrcAfterCopy(t *testing.T) {
+ t.Parallel()
+
+ pageSize := int64(header.PageSize)
+ memfd, expected := newTestMemfd(t, pageSize*4)
+
+ dirty := roaring.New()
+ dirty.AddRange(0, 4)
+
+ cache, err := NewCacheFromMemfdAsync(t.Context(), pageSize, t.TempDir()+"/cache", memfd, dirty)
+ require.NoError(t, err)
+ t.Cleanup(func() { _ = cache.Close() })
+
+ require.NoError(t, cache.Wait(t.Context()))
+
+ // After Wait, m.src must be nil so reads delegate to Cache.
+ require.Nil(t, cache.src, "m.src should be nil after copy completes")
+
+ // Reads must still work via the Cache.
+ got := make([]byte, pageSize)
+ _, err = cache.ReadAt(got, 0)
+ require.NoError(t, err)
+ require.Equal(t, expected[:pageSize], got)
+}You can send follow-ups to the cloud agent here.
Reviewed by Cursor Bugbot for commit 4c3d3b4. Configure here.
Introduce the MemfdCache wrapper (embedding *Cache) plus the NewCacheFromMemfdAsync constructor: copy runs on a goroutine so gRPC Pause can return as soon as the snapshot file and diff metadata are written. The MemfdBackgroundCopyFlag gates the dispatch in fc.ExportMemory; flag-off keeps the existing sync NewCacheFromMemfd path untouched. In-flight reads route through a memfdSource indexed by cache offset; afterwards they delegate to the embedded Cache and the memfd is closed. Slice returns BytesNotAvailableError while the copy is in flight to prevent UAF on the asynchronous Munmap; callers fall back to ReadAt or Wait first. localDiff takes block.DiffSource and uses a Wait type-assertion in CachePath so existing FS-reading upload paths see complete data.
4c3d3b4 to
8d3780f
Compare
In-flight reads belong in the experimental v2 design. Here MemfdCache is just a wait-on-read wrapper: ReadAt/Slice block on Wait, then delegate to the embedded Cache. Drops memfdSource, sync.RWMutex, and the read-during-copy test.


Background goroutine copies memfd → diff cache after Pause returns. The
snapshot file + diff metadata are written synchronously; the byte copy
runs detached so Pause returns sooner. Memfd is closed (hugetlb pages
released) at the end of the copy.
Reads (ReadAt/Slice/CachePath) Wait for the copy before serving.
Gated by MemfdBackgroundCopyFlag; requires UseMemFdFlag (#2522).