Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/orchestrator/chunks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ message PeerAvailability {
// caller should switch to reading from remote storage directly instead of
// this peer.
bool use_storage = 2;
// header_bytes carries the post-upload V4 header (framed files only).
bytes header_bytes = 3;
}

message GetBuildFileSizeRequest {
Expand Down
4 changes: 4 additions & 0 deletions packages/orchestrator/pkg/sandbox/block/streaming_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ func (c *Chunker) runFetch(ctx context.Context, s *fetchSession, ft *storage.Fra
fetchTimer := c.metrics.RemoteReadsTimerFactory.Begin()

readBytes, err := c.progressiveRead(ctx, s, mmapSlice, ft)
// Retry once on ErrPeerAborted; second read will go to remote storage.
if errors.Is(err, storage.ErrPeerAborted) {
readBytes, err = c.progressiveRead(ctx, s, mmapSlice, ft)
}
if err != nil {
fetchTimer.RecordRaw(ctx, readBytes, attrs.remoteFailure)

Expand Down
100 changes: 38 additions & 62 deletions packages/orchestrator/pkg/sandbox/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@ package build

import (
"context"
"errors"
"fmt"
Comment thread
levb marked this conversation as resolved.
"io"
"sync/atomic"

"github.com/google/uuid"
"go.uber.org/zap"

blockmetrics "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block/metrics"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
)

type headerProvider interface {
PendingHeader(buildID, name string) *header.Header
}

type File struct {
header atomic.Pointer[header.Header]
headers headerProvider
store *DiffStore
fileType DiffType
persistence storage.StorageProvider
Expand All @@ -33,10 +36,12 @@ func NewFile(
persistence storage.StorageProvider,
metrics blockmetrics.Metrics,
) *File {
hp, _ := persistence.(headerProvider)
f := &File{
store: store,
fileType: fileType,
persistence: persistence,
headers: hp,
metrics: metrics,
}
f.header.Store(header)
Expand All @@ -54,7 +59,7 @@ func (b *File) SwapHeader(h *header.Header) {

func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
for n < len(p) {
h := b.Header()
h := b.installPendingHeader()

mappedToBuild, err := h.GetShiftedMapping(ctx, off+int64(n))
if err != nil {
Expand Down Expand Up @@ -105,12 +110,6 @@ func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err erro
ft,
)
if err != nil {
if retry, swapErr := b.retryOnTransition(ctx, err); retry {
continue
} else if swapErr != nil {
return 0, swapErr
}

return 0, fmt.Errorf("failed to read from source: %w", err)
}

Expand All @@ -122,68 +121,45 @@ func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err erro

// The slice access must be in the predefined blocksize of the build.
func (b *File) Slice(ctx context.Context, off, _ int64) ([]byte, error) {
for {
h := b.Header()

mappedBuild, err := h.GetShiftedMapping(ctx, off)
if err != nil {
return nil, fmt.Errorf("failed to get mapping: %w", err)
}
h := b.installPendingHeader()

// Pass empty huge page when the build id is nil.
if mappedBuild.BuildId == uuid.Nil {
return header.EmptyHugePage, nil
}

size := b.buildFileSize(h, mappedBuild.BuildId)
ft := h.GetBuildFrameData(mappedBuild.BuildId)
diff, err := b.getBuild(ctx, mappedBuild.BuildId, size, ft.CompressionType())
if err != nil {
return nil, fmt.Errorf("failed to get build: %w", err)
}

result, err := diff.Slice(ctx, int64(mappedBuild.Offset), int64(h.Metadata.BlockSize), ft)
if err != nil {
if retry, swapErr := b.retryOnTransition(ctx, err); retry {
continue
} else if swapErr != nil {
return nil, swapErr
}

return nil, err
}
mappedBuild, err := h.GetShiftedMapping(ctx, off)
if err != nil {
return nil, fmt.Errorf("failed to get mapping: %w", err)
}

return result, nil
// Pass empty huge page when the build id is nil.
if mappedBuild.BuildId == uuid.Nil {
return header.EmptyHugePage, nil
}
}

// retryOnTransition catches a PeerTransitionedError and swaps the header from
// storage. Returns (true, nil) to signal the caller should continue the loop,
// or (false, swapErr) if the swap itself failed. peerSeekable emits the
// transition error at most once per seekable, so the loop is naturally
// bounded — no retry counter needed here.
//
// The transition is signaled only after the source upload has finalized, so
// the header object already exists in storage. A single LoadHeader is enough;
// polling here would multiply GCS reads under high peer-transition rates.
func (b *File) retryOnTransition(ctx context.Context, err error) (bool, error) {
var transErr *storage.PeerTransitionedError
if !errors.As(err, &transErr) {
return false, nil
size := b.buildFileSize(h, mappedBuild.BuildId)
ft := h.GetBuildFrameData(mappedBuild.BuildId)
diff, err := b.getBuild(ctx, mappedBuild.BuildId, size, ft.CompressionType())
if err != nil {
return nil, fmt.Errorf("failed to get build: %w", err)
}

logger.L().Info(ctx, "peer transition detected, swapping header",
zap.String("file_type", string(b.fileType)),
)
return diff.Slice(ctx, int64(mappedBuild.Offset), int64(h.Metadata.BlockSize), ft)
}

hdrPath := storage.Paths{BuildID: b.Header().Metadata.BuildId.String()}.HeaderFile(string(b.fileType))
h, loadErr := header.LoadHeader(ctx, b.persistence, hdrPath)
if loadErr != nil {
return false, fmt.Errorf("failed to swap header: %w", loadErr)
// installPendingHeader CAS-installs a peer-delivered post-upload header if
// one is pending and returns the current header. Steady-state cost is one
// atomic Load (pointer equality short-circuits the CAS).
func (b *File) installPendingHeader() *header.Header {
cur := b.header.Load()
if b.headers == nil {
return cur
}
h := b.headers.PendingHeader(cur.Metadata.BuildId.String(), string(b.fileType))
if h == nil || h == cur {
return cur
}
if b.header.CompareAndSwap(cur, h) {
return h
}
b.SwapHeader(h)

return true, nil
return b.header.Load()
}

// buildFileSize returns the uncompressed file size for a build. Returns 0 for
Expand Down
15 changes: 15 additions & 0 deletions packages/orchestrator/pkg/sandbox/build_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,23 @@ type Upload struct {
root storage.CompressConfig
objectMetadata storage.ObjectMetadata
future *utils.ErrorOnce

// V4 header bytes captured during runV4 for inline delivery on
// post-upload UseStorage responses. Nil for V3 builds and for files
// whose upload failed. Written by runV4's per-fileType goroutine,
// read after Run() returns (happens-before via errgroup.Wait).
memfileHeader []byte
rootfsHeader []byte
}

// MemfileHeader returns the serialized V4 memfile header captured during a
// successful V4 upload, or nil for V3 builds. Safe to call after Run() returns.
func (u *Upload) MemfileHeader() []byte { return u.memfileHeader }

// RootfsHeader returns the serialized V4 rootfs header captured during a
// successful V4 upload, or nil for V3 builds. Safe to call after Run() returns.
func (u *Upload) RootfsHeader() []byte { return u.rootfsHeader }

func NewUpload(
ctx context.Context,
uploads *Uploads,
Expand Down
8 changes: 6 additions & 2 deletions packages/orchestrator/pkg/sandbox/build_upload_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@ func (u *Upload) runV3(ctx context.Context) error {
return nil
}

return headers.StoreHeader(egCtx, u.store, u.paths.MemfileHeader(), finalizeV3(u.snap.MemfileDiffHeader))
_, err := headers.StoreHeader(egCtx, u.store, u.paths.MemfileHeader(), finalizeV3(u.snap.MemfileDiffHeader))

return err
})

eg.Go(func() error {
if u.snap.RootfsDiffHeader == nil {
return nil
}

return headers.StoreHeader(egCtx, u.store, u.paths.RootfsHeader(), finalizeV3(u.snap.RootfsDiffHeader))
_, err := headers.StoreHeader(egCtx, u.store, u.paths.RootfsHeader(), finalizeV3(u.snap.RootfsDiffHeader))

return err
})

meta := storage.WithMetadata(u.objectMetadata)
Expand Down
13 changes: 12 additions & 1 deletion packages/orchestrator/pkg/sandbox/build_upload_v4.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,21 @@ func (u *Upload) uploadFramed(
}
h.Builds[u.buildID] = selfBuild

if err := headers.StoreHeader(ctx, u.store, u.paths.HeaderFile(string(fileType)), h); err != nil {
data, err := headers.StoreHeader(ctx, u.store, u.paths.HeaderFile(string(fileType)), h)
if err != nil {
return fmt.Errorf("store %s header: %w", fileType, err)
}

// Retain the just-serialized bytes for inline delivery on post-upload
// UseStorage responses. Each goroutine writes its own field; reads happen
// after Run() returns (errgroup.Wait happens-before).
switch fileType {
case build.Memfile:
u.memfileHeader = data
case build.Rootfs:
u.rootfsHeader = data
}

return u.publish(ctx, fileType, h)
}

Expand Down
57 changes: 31 additions & 26 deletions packages/orchestrator/pkg/sandbox/template/peerclient/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package peerclient

import (
"context"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"

"go.uber.org/zap"

Expand Down Expand Up @@ -49,34 +47,39 @@ func (b *peerBlob) getBase(ctx context.Context) (storage.Blob, error) {
}

func (b *peerBlob) WriteTo(ctx context.Context, dst io.Writer) (int64, error) {
res, err := tryPeer(ctx, &b.peerHandle, "peer-blob-write-to", attrOpWriteTo,
n, hit, err := tryPeer(ctx, &b.peerHandle, "peer-blob-write-to", attrOpWriteTo,
func(ctx context.Context) (peerAttempt[int64], error) {
streamCtx, cancel := context.WithCancel(ctx)

recv, err := openPeerBlobStream(streamCtx, b.client, &orchestrator.GetBuildBlobRequest{
recv, outcome, err := openPeerBlobStream(streamCtx, b.client, &orchestrator.GetBuildBlobRequest{
BuildId: b.buildID,
Name: b.name,
}, b.uploaded)
}, b.state)
if err != nil {
cancel()
logger.L().Warn(ctx, "failed to open peer blob stream", logger.WithBuildID(b.buildID), zap.String("file_name", b.name), zap.Error(err))

return peerAttempt[int64]{}, nil
return peerAttempt[int64]{}, err
}
if outcome != served {
cancel()

return peerAttempt[int64]{result: outcome}, nil
}

reader := newPeerStreamReader(recv, cancel)
defer reader.Close()

n, err := io.Copy(dst, reader)
if err != nil {
return peerAttempt[int64]{value: n, bytes: n, hit: true},
return peerAttempt[int64]{value: n, bytes: n, result: served},
fmt.Errorf("failed to stream file %q from peer: %w", b.name, err)
}

return peerAttempt[int64]{value: n, bytes: n, hit: true}, nil
return peerAttempt[int64]{value: n, bytes: n, result: served}, nil
})
if res.hit {
return res.value, err
if hit {
return n, err
}

base, err := b.getBase(ctx)
Expand All @@ -88,24 +91,26 @@ func (b *peerBlob) WriteTo(ctx context.Context, dst io.Writer) (int64, error) {
}

func (b *peerBlob) Exists(ctx context.Context) (bool, error) {
res, err := tryPeer(ctx, &b.peerHandle, "peer-blob-exists", attrOpExists,
exists, hit, err := tryPeer(ctx, &b.peerHandle, "peer-blob-exists", attrOpExists,
func(ctx context.Context) (peerAttempt[bool], error) {
resp, err := b.client.GetBuildFileExists(ctx, &orchestrator.GetBuildFileExistsRequest{
BuildId: b.buildID,
Name: b.name,
})
if err == nil && checkPeerAvailability(resp.GetAvailability(), b.uploaded) {
return peerAttempt[bool]{value: true, hit: true}, nil
}

if err != nil {
logger.L().Warn(ctx, "failed to check build file exists from peer", logger.WithBuildID(b.buildID), zap.String("file_name", b.name), zap.Error(err))

return peerAttempt[bool]{}, err
}
outcome := checkPeerAvailability(ctx, resp.GetAvailability(), b.state, b.name)
if outcome != served {
return peerAttempt[bool]{result: outcome}, nil
}

return peerAttempt[bool]{}, nil
return peerAttempt[bool]{value: true, result: served}, nil
})
if res.hit {
return res.value, err
if hit {
return exists, err
}

base, err := b.getBase(ctx)
Expand Down Expand Up @@ -133,20 +138,20 @@ func openPeerBlobStream(
ctx context.Context,
client orchestrator.ChunkServiceClient,
req *orchestrator.GetBuildBlobRequest,
uploaded *atomic.Bool,
) (func() ([]byte, error), error) {
state *peerState,
) (func() ([]byte, error), result, error) {
stream, err := client.GetBuildBlob(ctx, req)
if err != nil {
return nil, fmt.Errorf("open blob stream: %w", err)
return nil, 0, fmt.Errorf("open blob stream: %w", err)
}

msg, err := stream.Recv()
if err != nil {
return nil, fmt.Errorf("recv first blob message: %w", err)
return nil, 0, fmt.Errorf("recv first blob message: %w", err)
}

if !checkPeerAvailability(msg.GetAvailability(), uploaded) {
return nil, errors.New("peer not available for blob stream")
if outcome := checkPeerAvailability(ctx, msg.GetAvailability(), state, req.GetName()); outcome != served {
return nil, outcome, nil
}

first := msg.GetData()
Expand All @@ -167,8 +172,8 @@ func openPeerBlobStream(
// Flip the uploaded flag if the peer signals use_storage; the current
// stream keeps reading from the peer, but subsequent operations will
// go directly to GCS.
checkPeerAvailability(m.GetAvailability(), uploaded)
_ = checkPeerAvailability(ctx, m.GetAvailability(), state, req.GetName())

return m.GetData(), nil
}, nil
}, served, nil
}
Loading
Loading