From afd3e65b283f2cb411e3ced5a77b7041ea3090d2 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 11 May 2026 11:23:25 -0700 Subject: [PATCH 1/3] =?UTF-8?q?fix(orch):=20P2P=20reader=20race=20on=20pee?= =?UTF-8?q?r=E2=86=92storage=20transition?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a peer signaled UseStorage to indicate its upload finished, the current reader saw `uploaded=true` and routed to base storage, but concurrent sibling reads on the same File could fall through with a stale FrameTable — V4 chunk offsets shift between the pre-finalize in-memory header and the post-finalize GCS header, so reads decoded the wrong bytes. The peer now ships the final V4 header inline on its UseStorage response (new `header_bytes` field on `PeerAvailability`). The bytes are the same `[]byte` produced once by `header.StoreHeader` during upload — captured on `*Upload` and forwarded into the server's `uploadedBuilds` cache, no second serialization, no failure path. Client side, the resolver's per-buildID state holds the parsed header. `build.File.installPendingHeader` polls the state at the top of every Slice/ReadAt iteration and CAS-installs the new header atomically. Concurrent readers either install the same value or observe it already installed — no coordination primitive needed, no GCS poll. Also: mid-stream peer abort (NotAvailable from cache eviction) now surfaces as `storage.ErrPeerAborted` so the chunker reopens via base on a single retry, instead of silently truncating the stream. --- packages/orchestrator/chunks.proto | 2 + .../pkg/sandbox/block/streaming_chunk.go | 4 + .../orchestrator/pkg/sandbox/build/build.go | 109 +++++------ .../orchestrator/pkg/sandbox/build_upload.go | 15 ++ .../pkg/sandbox/build_upload_v3.go | 8 +- .../pkg/sandbox/build_upload_v4.go | 13 +- .../pkg/sandbox/template/peerclient/blob.go | 61 +++--- .../sandbox/template/peerclient/blob_test.go | 76 ++++---- .../sandbox/template/peerclient/resolver.go | 111 +++++++---- .../sandbox/template/peerclient/seekable.go | 75 ++++---- .../template/peerclient/seekable_test.go | 173 +++++++++++------- .../sandbox/template/peerclient/storage.go | 144 +++++++++------ .../template/peerclient/storage_test.go | 5 +- packages/orchestrator/pkg/server/chunks.go | 41 +++-- packages/orchestrator/pkg/server/main.go | 7 +- packages/orchestrator/pkg/server/sandboxes.go | 5 +- .../shared/pkg/grpc/orchestrator/chunks.pb.go | 155 ++++++++-------- .../pkg/storage/header/serialization.go | 22 ++- packages/shared/pkg/storage/storage.go | 11 +- 19 files changed, 593 insertions(+), 444 deletions(-) diff --git a/packages/orchestrator/chunks.proto b/packages/orchestrator/chunks.proto index dee6f5ebd9..7c5975f574 100644 --- a/packages/orchestrator/chunks.proto +++ b/packages/orchestrator/chunks.proto @@ -16,6 +16,8 @@ message PeerAvailability { // caller should switch to reading from remote storage directly instead of // this peer. bool use_storage = 2; + // header_bytes carries the post-upload V4 header (framed files only). + bytes header_bytes = 3; } message GetBuildFileSizeRequest { diff --git a/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go b/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go index 0b3a02de17..5f782c4ab4 100644 --- a/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go +++ b/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go @@ -200,6 +200,10 @@ func (c *Chunker) runFetch(ctx context.Context, s *fetchSession, ft *storage.Fra fetchTimer := c.metrics.RemoteReadsTimerFactory.Begin() readBytes, err := c.progressiveRead(ctx, s, mmapSlice, ft) + // Retry once on ErrPeerAborted; second read will go to remote storage. + if errors.Is(err, storage.ErrPeerAborted) { + readBytes, err = c.progressiveRead(ctx, s, mmapSlice, ft) + } if err != nil { fetchTimer.RecordRaw(ctx, readBytes, attrs.remoteFailure) diff --git a/packages/orchestrator/pkg/sandbox/build/build.go b/packages/orchestrator/pkg/sandbox/build/build.go index fc38ea6a47..6565b8c3b9 100644 --- a/packages/orchestrator/pkg/sandbox/build/build.go +++ b/packages/orchestrator/pkg/sandbox/build/build.go @@ -4,13 +4,11 @@ package build import ( "context" - "errors" "fmt" "io" "sync/atomic" "github.com/google/uuid" - "go.uber.org/zap" blockmetrics "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block/metrics" "github.com/e2b-dev/infra/packages/shared/pkg/logger" @@ -18,8 +16,13 @@ import ( "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" ) +type headerProvider interface { + PendingHeader(buildID, name string) *header.Header +} + type File struct { header atomic.Pointer[header.Header] + headers headerProvider store *DiffStore fileType DiffType persistence storage.StorageProvider @@ -33,10 +36,12 @@ func NewFile( persistence storage.StorageProvider, metrics blockmetrics.Metrics, ) *File { + hp, _ := persistence.(headerProvider) f := &File{ store: store, fileType: fileType, persistence: persistence, + headers: hp, metrics: metrics, } f.header.Store(header) @@ -52,9 +57,10 @@ func (b *File) SwapHeader(h *header.Header) { b.header.Store(h) } -func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { +func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (int, error) { + var n int for n < len(p) { - h := b.Header() + h := b.installPendingHeader() mappedToBuild, err := h.GetShiftedMapping(ctx, off+int64(n)) if err != nil { @@ -105,12 +111,6 @@ func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err erro ft, ) if err != nil { - if retry, swapErr := b.retryOnTransition(ctx, err); retry { - continue - } else if swapErr != nil { - return 0, swapErr - } - return 0, fmt.Errorf("failed to read from source: %w", err) } @@ -122,68 +122,49 @@ func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err erro // The slice access must be in the predefined blocksize of the build. func (b *File) Slice(ctx context.Context, off, _ int64) ([]byte, error) { - for { - h := b.Header() - - mappedBuild, err := h.GetShiftedMapping(ctx, off) - if err != nil { - return nil, fmt.Errorf("failed to get mapping: %w", err) - } - - // Pass empty huge page when the build id is nil. - if mappedBuild.BuildId == uuid.Nil { - return header.EmptyHugePage, nil - } - - size := b.buildFileSize(h, mappedBuild.BuildId) - ft := h.GetBuildFrameData(mappedBuild.BuildId) - diff, err := b.getBuild(ctx, mappedBuild.BuildId, size, ft.CompressionType()) - if err != nil { - return nil, fmt.Errorf("failed to get build: %w", err) - } - - result, err := diff.Slice(ctx, int64(mappedBuild.Offset), int64(h.Metadata.BlockSize), ft) - if err != nil { - if retry, swapErr := b.retryOnTransition(ctx, err); retry { - continue - } else if swapErr != nil { - return nil, swapErr - } + h := b.installPendingHeader() - return nil, err - } + mappedBuild, err := h.GetShiftedMapping(ctx, off) + if err != nil { + return nil, fmt.Errorf("failed to get mapping: %w", err) + } - return result, nil + // Pass empty huge page when the build id is nil. + if mappedBuild.BuildId == uuid.Nil { + return header.EmptyHugePage, nil } -} -// retryOnTransition catches a PeerTransitionedError and swaps the header from -// storage. Returns (true, nil) to signal the caller should continue the loop, -// or (false, swapErr) if the swap itself failed. peerSeekable emits the -// transition error at most once per seekable, so the loop is naturally -// bounded — no retry counter needed here. -// -// The transition is signaled only after the source upload has finalized, so -// the header object already exists in storage. A single LoadHeader is enough; -// polling here would multiply GCS reads under high peer-transition rates. -func (b *File) retryOnTransition(ctx context.Context, err error) (bool, error) { - var transErr *storage.PeerTransitionedError - if !errors.As(err, &transErr) { - return false, nil + size := b.buildFileSize(h, mappedBuild.BuildId) + ft := h.GetBuildFrameData(mappedBuild.BuildId) + diff, err := b.getBuild(ctx, mappedBuild.BuildId, size, ft.CompressionType()) + if err != nil { + return nil, fmt.Errorf("failed to get build: %w", err) } - logger.L().Info(ctx, "peer transition detected, swapping header", - zap.String("file_type", string(b.fileType)), - ) + return diff.Slice(ctx, int64(mappedBuild.Offset), int64(h.Metadata.BlockSize), ft) +} - hdrPath := storage.Paths{BuildID: b.Header().Metadata.BuildId.String()}.HeaderFile(string(b.fileType)) - h, loadErr := header.LoadHeader(ctx, b.persistence, hdrPath) - if loadErr != nil { - return false, fmt.Errorf("failed to swap header: %w", loadErr) +// installPendingHeader installs a header the peer delivered via UseStorage +// and returns the current header — either the freshly-installed one or the +// pre-existing one if no install was needed. Idempotent: concurrent readers +// all CAS the same value. Skips the CAS once the pending header is already +// installed (pointer equality), keeping the per-iteration cost to one atomic +// Load on the steady-state path. +func (b *File) installPendingHeader() *header.Header { + cur := b.header.Load() + if b.headers == nil { + return cur } - b.SwapHeader(h) - - return true, nil + h := b.headers.PendingHeader(cur.Metadata.BuildId.String(), string(b.fileType)) + if h == nil || h == cur { + return cur + } + if b.header.CompareAndSwap(cur, h) { + return h + } + // Lost the CAS — an external SwapHeader landed concurrently. Re-load to + // surface whatever's now authoritative. + return b.header.Load() } // buildFileSize returns the uncompressed file size for a build. Returns 0 for diff --git a/packages/orchestrator/pkg/sandbox/build_upload.go b/packages/orchestrator/pkg/sandbox/build_upload.go index 4490bb7758..2adbbffe1c 100644 --- a/packages/orchestrator/pkg/sandbox/build_upload.go +++ b/packages/orchestrator/pkg/sandbox/build_upload.go @@ -27,8 +27,23 @@ type Upload struct { root storage.CompressConfig objectMetadata storage.ObjectMetadata future *utils.ErrorOnce + + // V4 header bytes captured during runV4 for inline delivery on + // post-upload UseStorage responses. Nil for V3 builds and for files + // whose upload failed. Written by runV4's per-fileType goroutine, + // read after Run() returns (happens-before via errgroup.Wait). + memfileHeader []byte + rootfsHeader []byte } +// MemfileHeader returns the serialized V4 memfile header captured during a +// successful V4 upload, or nil for V3 builds. Safe to call after Run() returns. +func (u *Upload) MemfileHeader() []byte { return u.memfileHeader } + +// RootfsHeader returns the serialized V4 rootfs header captured during a +// successful V4 upload, or nil for V3 builds. Safe to call after Run() returns. +func (u *Upload) RootfsHeader() []byte { return u.rootfsHeader } + func NewUpload( ctx context.Context, uploads *Uploads, diff --git a/packages/orchestrator/pkg/sandbox/build_upload_v3.go b/packages/orchestrator/pkg/sandbox/build_upload_v3.go index 37852af30b..4c6905682f 100644 --- a/packages/orchestrator/pkg/sandbox/build_upload_v3.go +++ b/packages/orchestrator/pkg/sandbox/build_upload_v3.go @@ -31,7 +31,9 @@ func (u *Upload) runV3(ctx context.Context) error { return nil } - return headers.StoreHeader(egCtx, u.store, u.paths.MemfileHeader(), finalizeV3(u.snap.MemfileDiffHeader)) + _, err := headers.StoreHeader(egCtx, u.store, u.paths.MemfileHeader(), finalizeV3(u.snap.MemfileDiffHeader)) + + return err }) eg.Go(func() error { @@ -39,7 +41,9 @@ func (u *Upload) runV3(ctx context.Context) error { return nil } - return headers.StoreHeader(egCtx, u.store, u.paths.RootfsHeader(), finalizeV3(u.snap.RootfsDiffHeader)) + _, err := headers.StoreHeader(egCtx, u.store, u.paths.RootfsHeader(), finalizeV3(u.snap.RootfsDiffHeader)) + + return err }) meta := storage.WithMetadata(u.objectMetadata) diff --git a/packages/orchestrator/pkg/sandbox/build_upload_v4.go b/packages/orchestrator/pkg/sandbox/build_upload_v4.go index b57b7fa162..caaf735106 100644 --- a/packages/orchestrator/pkg/sandbox/build_upload_v4.go +++ b/packages/orchestrator/pkg/sandbox/build_upload_v4.go @@ -86,10 +86,21 @@ func (u *Upload) uploadFramed( } h.Builds[u.buildID] = selfBuild - if err := headers.StoreHeader(ctx, u.store, u.paths.HeaderFile(string(fileType)), h); err != nil { + data, err := headers.StoreHeader(ctx, u.store, u.paths.HeaderFile(string(fileType)), h) + if err != nil { return fmt.Errorf("store %s header: %w", fileType, err) } + // Retain the just-serialized bytes for inline delivery on post-upload + // UseStorage responses. Each goroutine writes its own field; reads happen + // after Run() returns (errgroup.Wait happens-before). + switch fileType { + case build.Memfile: + u.memfileHeader = data + case build.Rootfs: + u.rootfsHeader = data + } + return u.publish(ctx, fileType, h) } diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/blob.go b/packages/orchestrator/pkg/sandbox/template/peerclient/blob.go index 65891b2f70..76980a7a17 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/blob.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/blob.go @@ -2,11 +2,9 @@ package peerclient import ( "context" - "errors" "fmt" "io" "sync" - "sync/atomic" "go.uber.org/zap" @@ -49,19 +47,24 @@ func (b *peerBlob) getBase(ctx context.Context) (storage.Blob, error) { } func (b *peerBlob) WriteTo(ctx context.Context, dst io.Writer) (int64, error) { - res, err := tryPeer(ctx, &b.peerHandle, "peer-blob-write-to", attrOpWriteTo, + n, hit, err := tryPeer(ctx, &b.peerHandle, "peer-blob-write-to", attrOpWriteTo, func(ctx context.Context) (peerAttempt[int64], error) { streamCtx, cancel := context.WithCancel(ctx) - recv, err := openPeerBlobStream(streamCtx, b.client, &orchestrator.GetBuildBlobRequest{ + recv, outcome, err := openPeerBlobStream(streamCtx, b.client, &orchestrator.GetBuildBlobRequest{ BuildId: b.buildID, Name: b.name, - }, b.uploaded) + }, b.state) if err != nil { cancel() logger.L().Warn(ctx, "failed to open peer blob stream", logger.WithBuildID(b.buildID), zap.String("file_name", b.name), zap.Error(err)) - return peerAttempt[int64]{}, nil + return peerAttempt[int64]{}, err + } + if outcome != served { + cancel() + + return peerAttempt[int64]{result: outcome}, nil } reader := newPeerStreamReader(recv, cancel) @@ -69,14 +72,14 @@ func (b *peerBlob) WriteTo(ctx context.Context, dst io.Writer) (int64, error) { n, err := io.Copy(dst, reader) if err != nil { - return peerAttempt[int64]{value: n, bytes: n, hit: true}, + return peerAttempt[int64]{value: n, bytes: n, result: served}, fmt.Errorf("failed to stream file %q from peer: %w", b.name, err) } - return peerAttempt[int64]{value: n, bytes: n, hit: true}, nil + return peerAttempt[int64]{value: n, bytes: n, result: served}, nil }) - if res.hit { - return res.value, err + if hit { + return n, err } base, err := b.getBase(ctx) @@ -88,24 +91,26 @@ func (b *peerBlob) WriteTo(ctx context.Context, dst io.Writer) (int64, error) { } func (b *peerBlob) Exists(ctx context.Context) (bool, error) { - res, err := tryPeer(ctx, &b.peerHandle, "peer-blob-exists", attrOpExists, + exists, hit, err := tryPeer(ctx, &b.peerHandle, "peer-blob-exists", attrOpExists, func(ctx context.Context) (peerAttempt[bool], error) { resp, err := b.client.GetBuildFileExists(ctx, &orchestrator.GetBuildFileExistsRequest{ BuildId: b.buildID, Name: b.name, }) - if err == nil && checkPeerAvailability(resp.GetAvailability(), b.uploaded) { - return peerAttempt[bool]{value: true, hit: true}, nil - } - if err != nil { logger.L().Warn(ctx, "failed to check build file exists from peer", logger.WithBuildID(b.buildID), zap.String("file_name", b.name), zap.Error(err)) + + return peerAttempt[bool]{}, err + } + outcome := checkPeerAvailability(resp.GetAvailability(), b.state, b.name) + if outcome != served { + return peerAttempt[bool]{result: outcome}, nil } - return peerAttempt[bool]{}, nil + return peerAttempt[bool]{value: true, result: served}, nil }) - if res.hit { - return res.value, err + if hit { + return exists, err } base, err := b.getBase(ctx) @@ -128,25 +133,26 @@ func (b *peerBlob) Put(ctx context.Context, data []byte, opts ...storage.PutOpti // openPeerBlobStream opens a GetBuildBlob stream, checks peer availability, // and returns a recv function that yields data chunks starting with the first message's data. +// Mid-stream availability changes are side-effect only; the in-flight stream completes from peer. // The passed context HAS to be canceled by the caller when done with the stream to avoid leaks. func openPeerBlobStream( ctx context.Context, client orchestrator.ChunkServiceClient, req *orchestrator.GetBuildBlobRequest, - uploaded *atomic.Bool, -) (func() ([]byte, error), error) { + state *peerState, +) (func() ([]byte, error), result, error) { stream, err := client.GetBuildBlob(ctx, req) if err != nil { - return nil, fmt.Errorf("open blob stream: %w", err) + return nil, 0, fmt.Errorf("open blob stream: %w", err) } msg, err := stream.Recv() if err != nil { - return nil, fmt.Errorf("recv first blob message: %w", err) + return nil, 0, fmt.Errorf("recv first blob message: %w", err) } - if !checkPeerAvailability(msg.GetAvailability(), uploaded) { - return nil, errors.New("peer not available for blob stream") + if outcome := checkPeerAvailability(msg.GetAvailability(), state, req.GetName()); outcome != served { + return nil, outcome, nil } first := msg.GetData() @@ -164,11 +170,8 @@ func openPeerBlobStream( return nil, err } - // Flip the uploaded flag if the peer signals use_storage; the current - // stream keeps reading from the peer, but subsequent operations will - // go directly to GCS. - checkPeerAvailability(m.GetAvailability(), uploaded) + _ = checkPeerAvailability(m.GetAvailability(), state, req.GetName()) return m.GetData(), nil - }, nil + }, served, nil } diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/blob_test.go b/packages/orchestrator/pkg/sandbox/template/peerclient/blob_test.go index 2038326329..b348f68eab 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/blob_test.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/blob_test.go @@ -5,7 +5,6 @@ import ( "context" "errors" "io" - "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -31,10 +30,10 @@ func TestPeerBlob_WriteTo_PeerSucceeds(t *testing.T) { })).Return(stream, nil) blob := &peerBlob{peerHandle: peerHandle{ - client: client, - buildID: "build-1", - name: "snapfile", - uploaded: &atomic.Bool{}, + client: client, + buildID: "build-1", + name: "snapfile", + state: &peerState{}, }} var buf bytes.Buffer @@ -64,10 +63,10 @@ func TestPeerBlob_WriteTo_PeerNotAvailable_FallsBackToBase(t *testing.T) { blob := &peerBlob{ peerHandle: peerHandle{ - client: client, - buildID: "build-1", - name: "snapfile", - uploaded: &atomic.Bool{}, + client: client, + buildID: "build-1", + name: "snapfile", + state: &peerState{}, }, openBase: func(ctx context.Context) (storage.Blob, error) { return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType) @@ -98,10 +97,10 @@ func TestPeerBlob_WriteTo_PeerError_FallsBackToBase(t *testing.T) { blob := &peerBlob{ peerHandle: peerHandle{ - client: client, - buildID: "build-1", - name: "snapfile", - uploaded: &atomic.Bool{}, + client: client, + buildID: "build-1", + name: "snapfile", + state: &peerState{}, }, openBase: func(ctx context.Context) (storage.Blob, error) { return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType) @@ -117,17 +116,17 @@ func TestPeerBlob_WriteTo_PeerError_FallsBackToBase(t *testing.T) { func TestPeerBlob_WriteTo_UploadedSetMidStream_CompletesFromPeerThenFallsBack(t *testing.T) { t.Parallel() - uploaded := &atomic.Bool{} + state := &peerState{} - // Peer streams three chunks; the second Recv sets uploaded=true - // (simulating a concurrent operation receiving UseStorage). + // Peer streams three chunks; the second Recv carries UseStorage as a + // side-effect signal (Blob recv loop flips state.uploaded but completes + // the in-flight stream). stream := orchestratormocks.NewMockChunkService_GetBuildBlobClient(t) stream.EXPECT().Recv().Return(&orchestrator.GetBuildBlobResponse{Data: []byte("aaa")}, nil).Once() - stream.EXPECT().Recv().RunAndReturn(func() (*orchestrator.GetBuildBlobResponse, error) { - uploaded.Store(true) - - return &orchestrator.GetBuildBlobResponse{Data: []byte("bbb")}, nil - }).Once() + stream.EXPECT().Recv().Return(&orchestrator.GetBuildBlobResponse{ + Data: []byte("bbb"), + Availability: &orchestrator.PeerAvailability{UseStorage: true}, + }, nil).Once() stream.EXPECT().Recv().Return(&orchestrator.GetBuildBlobResponse{Data: []byte("ccc")}, nil).Once() stream.EXPECT().Recv().Return(nil, io.EOF).Once() @@ -145,23 +144,23 @@ func TestPeerBlob_WriteTo_UploadedSetMidStream_CompletesFromPeerThenFallsBack(t blob := &peerBlob{ peerHandle: peerHandle{ - client: client, - buildID: "build-1", - name: "snapfile", - uploaded: uploaded, + client: client, + buildID: "build-1", + name: "snapfile", + state: state, }, openBase: func(ctx context.Context) (storage.Blob, error) { return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType) }, } - // First download: in-flight stream completes from peer despite uploaded being set mid-stream. + // First download: in-flight stream completes from peer despite UseStorage mid-stream. var buf1 bytes.Buffer n1, err := blob.WriteTo(t.Context(), &buf1) require.NoError(t, err) assert.Equal(t, int64(9), n1) assert.Equal(t, "aaabbbccc", buf1.String()) - assert.True(t, uploaded.Load()) + assert.True(t, state.uploaded.Load()) // Second download: uploaded is now true, skips peer and goes to base storage. var buf2 bytes.Buffer @@ -179,7 +178,7 @@ func TestPeerBlob_Exists_PeerHasFile(t *testing.T) { return req.GetBuildId() == "build-1" && req.GetName() == "snapfile" })).Return(&orchestrator.GetBuildFileExistsResponse{}, nil) - blob := &peerBlob{peerHandle: peerHandle{client: client, buildID: "build-1", name: "snapfile", uploaded: &atomic.Bool{}}} + blob := &peerBlob{peerHandle: peerHandle{client: client, buildID: "build-1", name: "snapfile", state: &peerState{}}} ok, err := blob.Exists(t.Context()) require.NoError(t, err) assert.True(t, ok) @@ -198,10 +197,10 @@ func TestPeerBlob_Exists_PeerNotAvailable_FallsBackToBase(t *testing.T) { blob := &peerBlob{ peerHandle: peerHandle{ - client: client, - buildID: "build-1", - name: "snapfile", - uploaded: &atomic.Bool{}, + client: client, + buildID: "build-1", + name: "snapfile", + state: &peerState{}, }, openBase: func(ctx context.Context) (storage.Blob, error) { return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType) @@ -224,13 +223,13 @@ func TestPeerBlob_Exists_UseStorage_FallsBackToBase(t *testing.T) { base := storage.NewMockStorageProvider(t) base.EXPECT().OpenBlob(mock.Anything, "build-1/snapfile", storage.SnapfileObjectType).Return(baseBlob, nil) - uploaded := &atomic.Bool{} + state := &peerState{} blob := &peerBlob{ peerHandle: peerHandle{ - client: client, - buildID: "build-1", - name: "snapfile", - uploaded: uploaded, + client: client, + buildID: "build-1", + name: "snapfile", + state: state, }, openBase: func(ctx context.Context) (storage.Blob, error) { return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType) @@ -240,5 +239,6 @@ func TestPeerBlob_Exists_UseStorage_FallsBackToBase(t *testing.T) { ok, err := blob.Exists(t.Context()) require.NoError(t, err) assert.True(t, ok) - assert.True(t, uploaded.Load(), "uploaded flag should be set after UseStorage response") + assert.True(t, state.uploaded.Load(), "uploaded flag should be set after UseStorage response") + assert.Nil(t, state.header("snapfile"), "Blob path with empty header_bytes must not stash a header") } diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/resolver.go b/packages/orchestrator/pkg/sandbox/template/peerclient/resolver.go index b919e50ceb..f99c11fb6a 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/resolver.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/resolver.go @@ -15,6 +15,8 @@ import ( e2bgrpc "github.com/e2b-dev/infra/packages/shared/pkg/grpc" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" + "github.com/e2b-dev/infra/packages/shared/pkg/storage" + "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" ) const peerConnectTimeout = 5 * time.Second @@ -27,14 +29,51 @@ const peerConnectTimeout = 5 * time.Second type Resolver interface { resolve(ctx context.Context, buildID string) (attribute.KeyValue, resolveResult) IsActive(buildID string) bool + PendingHeader(buildID, name string) *header.Header Purge(buildID string) Close() } +// peerState is per-buildID state shared across every peer{Blob,Seekable}. +// uploaded routes future reads to base; memfile/rootfs hold V4 headers +// delivered in UseStorage responses for build.File to install. +type peerState struct { + uploaded atomic.Bool + memfile atomic.Pointer[header.Header] + rootfs atomic.Pointer[header.Header] +} + +func (b *peerState) setHeader(name string, h *header.Header) { + switch name { + case storage.MemfileName: + b.memfile.Store(h) + case storage.RootfsName: + b.rootfs.Store(h) + } +} + +func (b *peerState) header(name string) *header.Header { + // Gate on uploaded: writer sequences setHeader before uploaded.Store(true); + // a reader observing uploaded=true is guaranteed (Go atomic ordering) to + // see the prior header store on its subsequent atomic load. V3 stores + // nothing — header() correctly returns nil for those builds. + if !b.uploaded.Load() { + return nil + } + switch name { + case storage.MemfileName: + return b.memfile.Load() + case storage.RootfsName: + return b.rootfs.Load() + } + + return nil +} + type resolveResult struct { - client orchestrator.ChunkServiceClient - uploaded *atomic.Bool - addr string + client orchestrator.ChunkServiceClient + state *peerState + addr string } // NopResolver returns a Resolver that always falls back to the base provider. @@ -45,16 +84,17 @@ type nopResolver struct{} func (nopResolver) resolve(context.Context, string) (attribute.KeyValue, resolveResult) { return attrResolveNoPeer, resolveResult{} } -func (nopResolver) IsActive(string) bool { return false } -func (nopResolver) Purge(string) {} -func (nopResolver) Close() {} +func (nopResolver) IsActive(string) bool { return false } +func (nopResolver) PendingHeader(string, string) *header.Header { return nil } +func (nopResolver) Purge(string) {} +func (nopResolver) Close() {} // peerResolver is the real implementation that looks up peers via the Registry. type peerResolver struct { registry Registry selfAddress string peerConns sync.Map // address → *grpc.ClientConn - uploadedBuilds sync.Map // buildID → *atomic.Bool + uploadedBuilds sync.Map // buildID → *peerState dialGroup singleflight.Group } @@ -106,36 +146,31 @@ func (r *peerResolver) isSelfAddress(address string) bool { return address == r.selfAddress } -// peerFlag returns the shared atomic "switched-to-storage" flag for buildID, -// creating it on first call. The presence of an entry in uploadedBuilds means -// "this build is/was peer-served on this orch"; the flag's value tracks -// whether a reader has observed the source switch to storage. Only resolve() -// creates entries (in the peer-found branch) so absence is meaningful: no -// peer ever existed for this build from this orch's perspective. -func (r *peerResolver) peerFlag(buildID string) *atomic.Bool { - if v, ok := r.uploadedBuilds.Load(buildID); ok { - return v.(*atomic.Bool) - } - - flag := &atomic.Bool{} - actual, _ := r.uploadedBuilds.LoadOrStore(buildID, flag) +// peerState returns the shared per-build state, creating it on first call. +// The presence of an entry in builds means "this build is/was peer-served on +// this orch"; the uploaded flag tracks whether a reader has observed the +// source switch to storage. Only resolve() creates entries (in the peer-found +// branch) so absence is meaningful: no peer ever existed for this build from +// this orch's perspective. +func (r *peerResolver) peerState(buildID string) *peerState { + actual, _ := r.uploadedBuilds.LoadOrStore(buildID, &peerState{}) - return actual.(*atomic.Bool) + return actual.(*peerState) } -// Purge removes the uploaded state for a build, called on template -// cache eviction so the entry doesn't accumulate forever. +// Purge removes the per-build state, called on template cache eviction so +// the entry doesn't accumulate forever. func (r *peerResolver) Purge(buildID string) { r.uploadedBuilds.Delete(buildID) } // resolve looks up the peer for the given build and returns a gRPC client if -// a remote peer is found. Returns a nil client when the base provider should +// a remote peer is found. Returns a zero result when the base provider should // be used instead (uploaded, no peer, self, or error). func (r *peerResolver) resolve(ctx context.Context, buildID string) (attribute.KeyValue, resolveResult) { // Fast path: a prior resolve flagged this build as peer-served and a // reader has since observed the switch to storage. - if v, ok := r.uploadedBuilds.Load(buildID); ok && v.(*atomic.Bool).Load() { + if v, ok := r.uploadedBuilds.Load(buildID); ok && v.(*peerState).uploaded.Load() { return attrResolveUploaded, resolveResult{} } @@ -157,17 +192,12 @@ func (r *peerResolver) resolve(ctx context.Context, buildID string) (attribute.K return attrResolveDialError, resolveResult{} } - // Peer found and dialable — register the flag now so IsActive and - // future resolves can answer locally without touching Redis. - uploaded := r.peerFlag(buildID) - if uploaded.Load() { - return attrResolveUploaded, resolveResult{} - } - + // Peer found and dialable — register state now so IsActive and future + // resolves can answer locally without touching Redis. return attrResolvePeer, resolveResult{ - client: orchestrator.NewChunkServiceClient(conn), - uploaded: uploaded, - addr: addr, + client: orchestrator.NewChunkServiceClient(conn), + state: r.peerState(buildID), + addr: addr, } } @@ -180,7 +210,16 @@ func (r *peerResolver) resolve(ctx context.Context, buildID string) (attribute.K func (r *peerResolver) IsActive(buildID string) bool { v, ok := r.uploadedBuilds.Load(buildID) - return ok && !v.(*atomic.Bool).Load() + return ok && !v.(*peerState).uploaded.Load() +} + +func (r *peerResolver) PendingHeader(buildID, name string) *header.Header { + v, ok := r.uploadedBuilds.Load(buildID) + if !ok { + return nil + } + + return v.(*peerState).header(name) } func (r *peerResolver) Close() { diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/seekable.go b/packages/orchestrator/pkg/sandbox/template/peerclient/seekable.go index 35fd45b1ad..2e4d97c6e4 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/seekable.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/seekable.go @@ -2,11 +2,9 @@ package peerclient import ( "context" - "errors" "fmt" "io" "sync" - "sync/atomic" "go.uber.org/zap" @@ -32,14 +30,6 @@ type peerSeekable struct { base storage.Seekable baseCT storage.CompressionType loaded bool - - // transitionEmitted ensures we signal PeerTransitionedError at most once - // after the peer flips uploaded=true. The caller (build.File) reacts by - // loading the post-upload header from storage; whether that ends up V4 - // (compressed) or V3 (no upgrade) determines how subsequent reads route. - // Either way, after the first emission we fall through to base so V3 - // builds don't loop forever against PeerTransitionedError. - transitionEmitted atomic.Bool } // getBase returns a base Seekable opened against the storage path composed @@ -68,24 +58,26 @@ func (s *peerSeekable) getBase(ctx context.Context, ct storage.CompressionType) } func (s *peerSeekable) Size(ctx context.Context) (int64, error) { - res, err := tryPeer(ctx, &s.peerHandle, "size peer-seekable", attrOpSize, + size, hit, err := tryPeer(ctx, &s.peerHandle, "size peer-seekable", attrOpSize, func(ctx context.Context) (peerAttempt[int64], error) { resp, err := s.client.GetBuildFileSize(ctx, &orchestrator.GetBuildFileSizeRequest{ BuildId: s.buildID, Name: s.name, }) - if err == nil && checkPeerAvailability(resp.GetAvailability(), s.uploaded) { - return peerAttempt[int64]{value: resp.GetTotalSize(), hit: true}, nil - } - if err != nil { logger.L().Warn(ctx, "failed to get build file size from peer", logger.WithBuildID(s.buildID), zap.Error(err)) + + return peerAttempt[int64]{}, err + } + outcome := checkPeerAvailability(resp.GetAvailability(), s.state, s.name) + if outcome != served { + return peerAttempt[int64]{result: outcome}, nil } - return peerAttempt[int64]{}, nil + return peerAttempt[int64]{value: resp.GetTotalSize(), result: served}, nil }) - if res.hit { - return res.value, err + if hit { + return size, err } // Size only reaches base for V3 builds (uncompressedSize unknown); @@ -100,34 +92,35 @@ func (s *peerSeekable) Size(ctx context.Context) (int64, error) { } func (s *peerSeekable) OpenRangeReader(ctx context.Context, off int64, length int64, frameTable *storage.FrameTable) (io.ReadCloser, error) { - res, err := tryPeer(ctx, &s.peerHandle, "peer-seekable-open-range-reader", attrOpRangeReader, + rc, hit, err := tryPeer(ctx, &s.peerHandle, "peer-seekable-open-range-reader", attrOpRangeReader, func(ctx context.Context) (peerAttempt[io.ReadCloser], error) { streamCtx, cancel := context.WithCancel(ctx) - recv, err := openPeerSeekableStream(streamCtx, s.client, &orchestrator.ReadAtBuildSeekableRequest{ + recv, outcome, err := openPeerSeekableStream(streamCtx, s.client, &orchestrator.ReadAtBuildSeekableRequest{ BuildId: s.buildID, Name: s.name, Offset: off, Length: length, - }, s.uploaded) + }, s.state) if err != nil { + cancel() logger.L().Warn(ctx, "failed to open range reader from peer", logger.WithBuildID(s.buildID), zap.Int64("off", off), zap.Int64("length", length), zap.Error(err)) + + return peerAttempt[io.ReadCloser]{}, err + } + if outcome != served { cancel() - return peerAttempt[io.ReadCloser]{}, nil + return peerAttempt[io.ReadCloser]{result: outcome}, nil } return peerAttempt[io.ReadCloser]{ - value: newPeerStreamReader(recv, cancel), - hit: true, + value: newPeerStreamReader(recv, cancel), + result: served, }, nil }) - if res.hit { - return res.value, err - } - - if s.uploaded != nil && s.uploaded.Load() && s.transitionEmitted.CompareAndSwap(false, true) { - return nil, &storage.PeerTransitionedError{} + if hit { + return rc, err } base, err := s.getBase(ctx, frameTable.CompressionType()) @@ -151,25 +144,26 @@ func (s *peerSeekable) StoreFile(context.Context, string, ...storage.PutOption) // openPeerSeekableStream opens a ReadAtBuildSeekable stream, checks peer availability, // and returns a recv function that yields data chunks starting with the first message's data. +// Mid-stream non-Served signals abort via storage.ErrPeerAborted; the caller reopens via base. // The passed context HAS to be canceled by the caller when done with the stream to avoid leaks. func openPeerSeekableStream( ctx context.Context, client orchestrator.ChunkServiceClient, req *orchestrator.ReadAtBuildSeekableRequest, - uploaded *atomic.Bool, -) (func() ([]byte, error), error) { + state *peerState, +) (func() ([]byte, error), result, error) { stream, err := client.ReadAtBuildSeekable(ctx, req) if err != nil { - return nil, fmt.Errorf("open seekable stream: %w", err) + return nil, 0, fmt.Errorf("open seekable stream: %w", err) } msg, err := stream.Recv() if err != nil { - return nil, fmt.Errorf("recv first seekable message: %w", err) + return nil, 0, fmt.Errorf("recv first seekable message: %w", err) } - if !checkPeerAvailability(msg.GetAvailability(), uploaded) { - return nil, errors.New("peer not available for seekable stream") + if outcome := checkPeerAvailability(msg.GetAvailability(), state, req.GetName()); outcome != served { + return nil, outcome, nil } first := msg.GetData() @@ -187,11 +181,10 @@ func openPeerSeekableStream( return nil, err } - // Flip the uploaded flag if the peer signals use_storage; the current - // stream keeps reading from the peer, but subsequent operations will - // go directly to GCS. - checkPeerAvailability(m.GetAvailability(), uploaded) + if checkPeerAvailability(m.GetAvailability(), state, req.GetName()) != served { + return nil, storage.ErrPeerAborted + } return m.GetData(), nil - }, nil + }, served, nil } diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/seekable_test.go b/packages/orchestrator/pkg/sandbox/template/peerclient/seekable_test.go index b5266e9094..78e2470f0f 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/seekable_test.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/seekable_test.go @@ -4,9 +4,9 @@ import ( "bytes" "errors" "io" - "sync/atomic" "testing" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -14,6 +14,7 @@ import ( "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" orchestratormocks "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator/mocks" "github.com/e2b-dev/infra/packages/shared/pkg/storage" + "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" ) func TestPeerSeekable_Size_PeerSucceeds(t *testing.T) { @@ -24,7 +25,7 @@ func TestPeerSeekable_Size_PeerSucceeds(t *testing.T) { return req.GetBuildId() == "build-1" && req.GetName() == storage.MemfileName })).Return(&orchestrator.GetBuildFileSizeResponse{TotalSize: 4096}, nil) - s := &peerSeekable{peerHandle: peerHandle{client: client, buildID: "build-1", name: storage.MemfileName, uploaded: &atomic.Bool{}}} + s := &peerSeekable{peerHandle: peerHandle{client: client, buildID: "build-1", name: storage.MemfileName, state: &peerState{}}} size, err := s.Size(t.Context()) require.NoError(t, err) assert.Equal(t, int64(4096), size) @@ -44,10 +45,10 @@ func TestPeerSeekable_Size_PeerNotAvailable_FallsBackToBase(t *testing.T) { s := &peerSeekable{ peerHandle: peerHandle{ - client: client, - buildID: "build-1", - name: storage.MemfileName, - uploaded: &atomic.Bool{}, + client: client, + buildID: "build-1", + name: storage.MemfileName, + state: &peerState{}, }, basePersistence: base, objType: storage.MemfileObjectType, @@ -71,7 +72,7 @@ func TestPeerSeekable_OpenRangeReader_PeerSucceeds(t *testing.T) { return req.GetOffset() == 10 && req.GetLength() == int64(len(data)) })).Return(stream, nil) - s := &peerSeekable{peerHandle: peerHandle{client: client, buildID: "build-1", name: storage.MemfileName, uploaded: &atomic.Bool{}}} + s := &peerSeekable{peerHandle: peerHandle{client: client, buildID: "build-1", name: storage.MemfileName, state: &peerState{}}} rc, err := s.OpenRangeReader(t.Context(), 10, int64(len(data)), nil) require.NoError(t, err) defer rc.Close() @@ -96,10 +97,10 @@ func TestPeerSeekable_OpenRangeReader_PeerError_FallsBackToBase(t *testing.T) { s := &peerSeekable{ peerHandle: peerHandle{ - client: client, - buildID: "build-1", - name: storage.MemfileName, - uploaded: &atomic.Bool{}, + client: client, + buildID: "build-1", + name: storage.MemfileName, + state: &peerState{}, }, basePersistence: base, objType: storage.MemfileObjectType, @@ -113,77 +114,78 @@ func TestPeerSeekable_OpenRangeReader_PeerError_FallsBackToBase(t *testing.T) { assert.Equal(t, baseData, got) } -func TestPeerSeekable_OpenRangeReader_Uploaded_ReturnsPeerTransitionedError(t *testing.T) { +func TestPeerSeekable_OpenRangeReader_Uploaded_RoutesToBase(t *testing.T) { t.Parallel() - // Once uploaded flips, the first OpenRangeReader returns - // PeerTransitionedError without touching either the peer or base. The - // caller is expected to refresh its header and retry. + // Once uploaded flips, OpenRangeReader skips the peer and routes to base + // directly. Header coordination is the File's job (via + // storage.HeaderReloadProbe + ensureHeaderSwapped), not peerSeekable's. + baseData := []byte("base range") client := orchestratormocks.NewMockChunkServiceClient(t) - base := storage.NewMockStorageProvider(t) - uploaded := &atomic.Bool{} - uploaded.Store(true) + state := &peerState{} + state.uploaded.Store(true) + + baseSeekable := storage.NewMockSeekable(t) + baseSeekable.EXPECT(). + OpenRangeReader(mock.Anything, int64(0), int64(len(baseData)), mock.Anything). + Return(io.NopCloser(bytes.NewReader(baseData)), nil).Once() + + base := storage.NewMockStorageProvider(t) + base.EXPECT(). + OpenSeekable(mock.Anything, "build-1/memfile", storage.MemfileObjectType). + Return(baseSeekable, nil).Once() s := &peerSeekable{ peerHandle: peerHandle{ - client: client, - buildID: "build-1", - name: storage.MemfileName, - uploaded: uploaded, + client: client, + buildID: "build-1", + name: storage.MemfileName, + state: state, }, basePersistence: base, objType: storage.MemfileObjectType, } - _, err := s.OpenRangeReader(t.Context(), 0, 100, nil) - require.Error(t, err) - - var transErr *storage.PeerTransitionedError - require.ErrorAs(t, err, &transErr) + rc, err := s.OpenRangeReader(t.Context(), 0, int64(len(baseData)), + storage.NewFrameTable(storage.CompressionNone, nil)) + require.NoError(t, err) + defer rc.Close() + got, err := io.ReadAll(rc) + require.NoError(t, err) + assert.Equal(t, baseData, got) } -// TestPeerStorageProvider_FullTransitionFlow walks the whole peerclient -// surface across a peer→storage transition with a header swap from V3 (basic -// path) to V4 (zstd-compressed path). Regression cover for the bug where the -// post-transition read kept hitting the original uncompressed path. -// -// Sequence: -// 1. Pre-transition: caller passes ft={ct=None}; peer answers; bytes flow. -// 2. Peer signals UseStorage; uploaded flips to true. -// 3. First post-transition call: peerSeekable returns PeerTransitionedError -// immediately (no peer call, no base open). -// 4. Caller (build.File.retryOnTransition, simulated here) reloads the V4 -// header and retries with ft={ct=Zstd}. -// 5. peerSeekable falls through to base, which opens "build-1/memfile.zstd" -// (not "build-1/memfile") and serves the compressed bytes. func TestPeerStorageProvider_FullTransitionFlow(t *testing.T) { t.Parallel() - uploaded := &atomic.Bool{} + state := &peerState{} + + buildUUID := uuid.New() + v4Header, err := header.NewHeader(&header.Metadata{ + Version: header.MetadataVersionV4, + BuildId: buildUUID, + BlockSize: 4096, + Size: 4096, + }, nil) + require.NoError(t, err) + headerBytes, err := header.SerializeHeader(v4Header) + require.NoError(t, err) prePeerBytes := []byte("pre-transition peer payload") postBaseBytes := []byte("post-transition compressed payload") - // Pre-transition peer stream: serves bytes once, then EOF. uploaded is - // flipped via UseStorage on the EOF response so subsequent calls skip - // the peer. preStream := orchestratormocks.NewMockChunkService_ReadAtBuildSeekableClient(t) preStream.EXPECT().Recv().Return(&orchestrator.ReadAtBuildSeekableResponse{Data: prePeerBytes}, nil).Once() - preStream.EXPECT().Recv().RunAndReturn(func() (*orchestrator.ReadAtBuildSeekableResponse, error) { - uploaded.Store(true) - - return nil, io.EOF - }).Once() + preStream.EXPECT().Recv().Return(&orchestrator.ReadAtBuildSeekableResponse{ + Availability: &orchestrator.PeerAvailability{UseStorage: true, HeaderBytes: headerBytes}, + }, nil).Once() client := orchestratormocks.NewMockChunkServiceClient(t) client.EXPECT().ReadAtBuildSeekable(mock.Anything, mock.MatchedBy(func(req *orchestrator.ReadAtBuildSeekableRequest) bool { - // Peer is asked by basic name only. return req.GetBuildId() == "build-1" && req.GetName() == storage.MemfileName })).Return(preStream, nil).Once() - // Base is only consulted post-transition, and only against the compressed - // path. If the bug regresses (uncompressed path), this expectation fails. postBaseSeekable := storage.NewMockSeekable(t) postBaseSeekable.EXPECT(). OpenRangeReader(mock.Anything, int64(0), int64(len(postBaseBytes)), mock.Anything). @@ -194,33 +196,62 @@ func TestPeerStorageProvider_FullTransitionFlow(t *testing.T) { OpenSeekable(mock.Anything, "build-1/memfile.zstd", storage.MemfileObjectType). Return(postBaseSeekable, nil).Once() - p := newPeerStorageProvider(base, client, uploaded) + p := newPeerStorageProvider(base, client, state) seekable, err := p.OpenSeekable(t.Context(), "build-1/memfile", storage.MemfileObjectType) require.NoError(t, err) - // 1. Pre-transition read via peer. ft={ct=None} (V3 header). - rc, err := seekable.OpenRangeReader(t.Context(), 0, int64(len(prePeerBytes)), - storage.NewFrameTable(storage.CompressionNone, nil)) + ftZstd := storage.NewFrameTable(storage.CompressionZstd, nil) + rc, err := seekable.OpenRangeReader(t.Context(), 0, int64(len(prePeerBytes)), ftZstd) require.NoError(t, err) - got, err := io.ReadAll(rc) + _, _ = io.ReadAll(rc) + require.NoError(t, rc.Close()) + + require.True(t, state.uploaded.Load(), "uploaded should be set after UseStorage") + got := state.header(storage.MemfileName) + require.NotNil(t, got, "pending header should be stashed") + require.Equal(t, buildUUID, got.Metadata.BuildId) + + rc, err = seekable.OpenRangeReader(t.Context(), 0, int64(len(postBaseBytes)), ftZstd) + require.NoError(t, err) + gotBytes, err := io.ReadAll(rc) require.NoError(t, err) require.NoError(t, rc.Close()) - assert.Equal(t, prePeerBytes, got) - require.True(t, uploaded.Load(), "uploaded flag should be set after peer EOF with UseStorage") + assert.Equal(t, postBaseBytes, gotBytes) +} - // 2. First post-transition call: retriable error, no peer/base contact. - _, err = seekable.OpenRangeReader(t.Context(), 0, 1, - storage.NewFrameTable(storage.CompressionNone, nil)) - var transErr *storage.PeerTransitionedError - require.ErrorAs(t, err, &transErr) +// V3 transition: server attaches no header bytes; client must not stash anything. +func TestPeerStorageProvider_V3Transition_NoPendingHeader(t *testing.T) { + t.Parallel() + + state := &peerState{} + + preStream := orchestratormocks.NewMockChunkService_ReadAtBuildSeekableClient(t) + preStream.EXPECT().Recv().Return(&orchestrator.ReadAtBuildSeekableResponse{ + Availability: &orchestrator.PeerAvailability{UseStorage: true}, + }, nil).Once() + + client := orchestratormocks.NewMockChunkServiceClient(t) + client.EXPECT().ReadAtBuildSeekable(mock.Anything, mock.Anything).Return(preStream, nil).Once() + + baseSeekable := storage.NewMockSeekable(t) + baseSeekable.EXPECT(). + OpenRangeReader(mock.Anything, int64(0), int64(10), mock.Anything). + Return(io.NopCloser(bytes.NewReader(make([]byte, 10))), nil).Once() + + base := storage.NewMockStorageProvider(t) + base.EXPECT(). + OpenSeekable(mock.Anything, "build-1/memfile", storage.MemfileObjectType). + Return(baseSeekable, nil).Once() - // 3. Caller reloads V4 header and retries with ct=Zstd. This must hit the - // compressed path on base. - rc, err = seekable.OpenRangeReader(t.Context(), 0, int64(len(postBaseBytes)), - storage.NewFrameTable(storage.CompressionZstd, nil)) + p := newPeerStorageProvider(base, client, state) + seekable, err := p.OpenSeekable(t.Context(), "build-1/memfile", storage.MemfileObjectType) require.NoError(t, err) - got, err = io.ReadAll(rc) + + rc, err := seekable.OpenRangeReader(t.Context(), 0, 10, + storage.NewFrameTable(storage.CompressionNone, nil)) require.NoError(t, err) require.NoError(t, rc.Close()) - assert.Equal(t, postBaseBytes, got) + + require.True(t, state.uploaded.Load()) + require.Nil(t, state.header(storage.MemfileName), "V3 path must not stash a pending header") } diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go b/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go index 9e7b7b6883..a57d3e76fa 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "sync/atomic" "time" "go.opentelemetry.io/otel" @@ -15,6 +14,7 @@ import ( "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" "github.com/e2b-dev/infra/packages/shared/pkg/storage" + "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" "github.com/e2b-dev/infra/packages/shared/pkg/utils" ) @@ -42,8 +42,10 @@ var ( attrResolvePeer = attribute.String("peer_resolve", "peer") attrResolveUploaded = attribute.String("peer_resolve", "uploaded") - attrPeerHitTrue = attribute.Bool("peer_hit", true) - attrPeerHitFalse = attribute.Bool("peer_hit", false) + attrPeerHitTrue = attribute.String("peer_hit", "true") + attrPeerHitFalse = attribute.String("peer_hit", "false") + attrPeerHitTransition = attribute.String("peer_hit", "transitioned") + attrPeerHitMiss = attribute.String("peer_hit", "miss") ) var _ storage.StorageProvider = (*routingProvider)(nil) @@ -76,7 +78,12 @@ func (p *routingProvider) resolveProvider(ctx context.Context, buildID string) s span.SetAttributes(attribute.String("peer_address", res.addr)) - return newPeerStorageProvider(p.base, res.client, res.uploaded) + return newPeerStorageProvider(p.base, res.client, res.state) +} + +// PendingHeader implements storage.HeaderProvider. +func (p *routingProvider) PendingHeader(buildID, name string) *header.Header { + return p.resolver.PendingHeader(buildID, name) } func (p *routingProvider) OpenBlob(ctx context.Context, path string, objType storage.ObjectType) (storage.Blob, error) { @@ -109,20 +116,21 @@ var _ storage.StorageProvider = (*peerStorageProvider)(nil) type peerStorageProvider struct { base storage.StorageProvider peerClient orchestrator.ChunkServiceClient - // uploaded is set to true when the peer signals that GCS upload is complete - // (use_storage=true). Once set, all subsequent reads skip the peer and go to base. - uploaded *atomic.Bool + // state holds the per-buildID coordination shared by all peer{Blob,Seekable} + // for this build: uploaded flag (route to base post-flip) and parsed V4 + // headers (delivered inline on UseStorage; build.File installs them). + state *peerState } func newPeerStorageProvider( base storage.StorageProvider, peerClient orchestrator.ChunkServiceClient, - uploaded *atomic.Bool, + state *peerState, ) storage.StorageProvider { return &peerStorageProvider{ base: base, peerClient: peerClient, - uploaded: uploaded, + state: state, } } @@ -131,10 +139,10 @@ func (p *peerStorageProvider) OpenBlob(_ context.Context, path string, objType s return &peerBlob{ peerHandle: peerHandle{ - client: p.peerClient, - buildID: buildID, - name: t, - uploaded: p.uploaded, + client: p.peerClient, + buildID: buildID, + name: t, + state: p.state, }, openBase: func(ctx context.Context) (storage.Blob, error) { return p.base.OpenBlob(ctx, path, objType) @@ -156,10 +164,10 @@ func (p *peerStorageProvider) OpenSeekable(_ context.Context, path string, objTy return &peerSeekable{ peerHandle: peerHandle{ - client: p.peerClient, - buildID: buildID, - name: t, - uploaded: p.uploaded, + client: p.peerClient, + buildID: buildID, + name: t, + state: p.state, }, basePersistence: p.base, objType: objType, @@ -178,86 +186,112 @@ func (p *peerStorageProvider) GetDetails() string { return p.base.GetDetails() } -// checkPeerAvailability also marks the uploaded flag when UseStorage is set. -func checkPeerAvailability(avail *orchestrator.PeerAvailability, uploaded *atomic.Bool) bool { - if avail.GetNotAvailable() { - return false - } - +// checkPeerAvailability classifies a peer availability message. On UseStorage +// it flips state.uploaded and (if header bytes are present) stashes the +// parsed V4 header for File to install. Parse errors are dropped silently. +func checkPeerAvailability(avail *orchestrator.PeerAvailability, state *peerState, name string) result { if avail.GetUseStorage() { - uploaded.Store(true) + if state != nil { + if bytes := avail.GetHeaderBytes(); len(bytes) > 0 { + if h, err := header.DeserializeBytes(bytes); err == nil { + state.setHeader(name, h) + } + } + state.uploaded.Store(true) + } - return false + return transitioned } - return true + if avail.GetNotAvailable() { + return missed + } + + return served } // peerHandle holds the peer-side identity shared by peerBlob and peerSeekable. -// fileName is the basic (uncompressed) name — peer fetches always use it. type peerHandle struct { - client orchestrator.ChunkServiceClient - buildID string - name string - uploaded *atomic.Bool + client orchestrator.ChunkServiceClient + buildID string + name string + state *peerState } -// peerAttempt is the result of a peer read attempt. -// hit=true means the peer had data (value is populated); when hit=true and the -// caller also returns a non-nil error the helper records a partial failure. +// result enumerates how a peer attempt resolved. Zero value is unnamed and +// covers the early-return / real-error paths; tryPeer's default arm handles +// it as a failure. +type result int + +const ( + served result = iota + 1 // peer returned data + missed // NotAvailable signal + transitioned // UseStorage signal +) + type peerAttempt[T any] struct { - value T - bytes int64 - hit bool + value T + bytes int64 + result result } -// tryPeer attempts a peer read if the peer is still authoritative for this -// build. It records peer telemetry and returns the attempt; the caller -// inspects res.hit to decide whether to fall through to base. tryPeer never -// opens base. +// tryPeer runs peerFn if state.uploaded is still false, records telemetry, +// and returns (value, served, err). Availability signals (Miss/Transitioned) +// are recorded as Success and surface as served=false; only real RPC errors +// propagate as err. func tryPeer[T any]( ctx context.Context, h *peerHandle, spanName string, opAttr attribute.KeyValue, peerFn func(ctx context.Context) (peerAttempt[T], error), -) (peerAttempt[T], error) { +) (T, bool, error) { ctx, span := tracer.Start(ctx, spanName, trace.WithAttributes( attribute.String("file_name", h.name), )) defer span.End() - if h.uploaded.Load() { + var zero T + if h.state != nil && h.state.uploaded.Load() { span.SetAttributes(attrPeerHitFalse) - return peerAttempt[T]{}, nil + return zero, false, nil } timer := peerReadTimerFactory.Begin(opAttr) res, err := peerFn(ctx) - if res.hit { + switch res.result { + case served: if err != nil { + // partial failure: data was returned but streaming/closing failed span.RecordError(err) timer.Failure(ctx, res.bytes) - return res, err + return res.value, true, err } - span.SetAttributes(attrPeerHitTrue) timer.Success(ctx, res.bytes) - return res, nil - } + return res.value, true, nil - if err != nil { - span.RecordError(err) - } + case transitioned: + span.SetAttributes(attrPeerHitTransition) + timer.Success(ctx, 0) + + case missed: + span.SetAttributes(attrPeerHitMiss) + timer.Success(ctx, 0) - timer.Failure(ctx, 0) - span.SetAttributes(attrPeerHitFalse) + default: + if err != nil { + span.RecordError(err) + } + span.SetAttributes(attrPeerHitFalse) + timer.Failure(ctx, 0) + } - return peerAttempt[T]{}, nil + return zero, false, nil } var _ io.ReadCloser = (*peerStreamReader)(nil) diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/storage_test.go b/packages/orchestrator/pkg/sandbox/template/peerclient/storage_test.go index ac34565de0..95e37228ac 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/storage_test.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/storage_test.go @@ -3,7 +3,6 @@ package peerclient import ( "bytes" "io" - "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -29,7 +28,7 @@ func TestPeerStorageProvider_OpenBlob_ExtractsFileName(t *testing.T) { base := storage.NewMockStorageProvider(t) - p := newPeerStorageProvider(base, client, &atomic.Bool{}) + p := newPeerStorageProvider(base, client, &peerState{}) blob, err := p.OpenBlob(t.Context(), "build-1/snapfile", storage.SnapfileObjectType) require.NoError(t, err) @@ -49,7 +48,7 @@ func TestPeerStorageProvider_OpenSeekable_ExtractsFileName(t *testing.T) { base := storage.NewMockStorageProvider(t) - p := newPeerStorageProvider(base, client, &atomic.Bool{}) + p := newPeerStorageProvider(base, client, &peerState{}) ff, err := p.OpenSeekable(t.Context(), "build-1/memfile", storage.MemfileObjectType) require.NoError(t, err) diff --git a/packages/orchestrator/pkg/server/chunks.go b/packages/orchestrator/pkg/server/chunks.go index 0e7c3a2085..72737019a1 100644 --- a/packages/orchestrator/pkg/server/chunks.go +++ b/packages/orchestrator/pkg/server/chunks.go @@ -12,13 +12,30 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/template/peerserver" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" + "github.com/e2b-dev/infra/packages/shared/pkg/storage" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" ) -var ( - peerNotAvailable = &orchestrator.PeerAvailability{NotAvailable: true} - peerUseStorage = &orchestrator.PeerAvailability{UseStorage: true} -) +var peerNotAvailable = &orchestrator.PeerAvailability{NotAvailable: true} + +// uploadedHeaders carries serialized V4 headers inline on UseStorage +// responses so the client skips a separate storage round-trip. V3: both nil. +type uploadedHeaders struct { + memfile []byte + rootfs []byte +} + +func (b uploadedHeaders) useStorage(name string) *orchestrator.PeerAvailability { + avail := &orchestrator.PeerAvailability{UseStorage: true} + switch name { + case storage.MemfileName: + avail.HeaderBytes = b.memfile + case storage.RootfsName: + avail.HeaderBytes = b.rootfs + } + + return avail +} // seekableStreamSender implements peerserver.Sender over a gRPC server stream (for seekable files). type seekableStreamSender struct { @@ -52,10 +69,10 @@ func toGRPCError(err error) error { func (s *Server) GetBuildFileSize(ctx context.Context, req *orchestrator.GetBuildFileSizeRequest) (*orchestrator.GetBuildFileSizeResponse, error) { telemetry.SetAttributes(ctx, telemetry.WithBuildID(req.GetBuildId()), attribute.String("file_name", req.GetName())) - if s.uploadedBuilds.Get(req.GetBuildId()) != nil { + if u := s.uploadedBuilds.Get(req.GetBuildId()); u != nil { telemetry.SetAttributes(ctx, attribute.Bool("uploaded", true)) - return &orchestrator.GetBuildFileSizeResponse{Availability: peerUseStorage}, nil + return &orchestrator.GetBuildFileSizeResponse{Availability: u.Value().useStorage(req.GetName())}, nil } src, err := peerserver.ResolveSeekable(s.templateCache, req.GetBuildId(), req.GetName()) @@ -80,10 +97,10 @@ func (s *Server) GetBuildFileSize(ctx context.Context, req *orchestrator.GetBuil func (s *Server) GetBuildFileExists(ctx context.Context, req *orchestrator.GetBuildFileExistsRequest) (*orchestrator.GetBuildFileExistsResponse, error) { telemetry.SetAttributes(ctx, telemetry.WithBuildID(req.GetBuildId()), attribute.String("file_name", req.GetName())) - if s.uploadedBuilds.Get(req.GetBuildId()) != nil { + if u := s.uploadedBuilds.Get(req.GetBuildId()); u != nil { telemetry.SetAttributes(ctx, attribute.Bool("uploaded", true)) - return &orchestrator.GetBuildFileExistsResponse{Availability: peerUseStorage}, nil + return &orchestrator.GetBuildFileExistsResponse{Availability: u.Value().useStorage(req.GetName())}, nil } src, err := peerserver.ResolveBlob(s.templateCache, req.GetBuildId(), req.GetName()) @@ -124,10 +141,10 @@ func (s *Server) ReadAtBuildSeekable(req *orchestrator.ReadAtBuildSeekableReques attribute.Int64("length", length), ) - if s.uploadedBuilds.Get(req.GetBuildId()) != nil { + if u := s.uploadedBuilds.Get(req.GetBuildId()); u != nil { telemetry.SetAttributes(ctx, attribute.Bool("uploaded", true)) - return stream.Send(&orchestrator.ReadAtBuildSeekableResponse{Availability: peerUseStorage}) + return stream.Send(&orchestrator.ReadAtBuildSeekableResponse{Availability: u.Value().useStorage(req.GetName())}) } src, err := peerserver.ResolveSeekable(s.templateCache, req.GetBuildId(), req.GetName()) @@ -159,10 +176,10 @@ func (s *Server) GetBuildBlob(req *orchestrator.GetBuildBlobRequest, stream orch attribute.String("file_name", req.GetName()), ) - if s.uploadedBuilds.Get(req.GetBuildId()) != nil { + if u := s.uploadedBuilds.Get(req.GetBuildId()); u != nil { telemetry.SetAttributes(ctx, attribute.Bool("uploaded", true)) - return stream.Send(&orchestrator.GetBuildBlobResponse{Availability: peerUseStorage}) + return stream.Send(&orchestrator.GetBuildBlobResponse{Availability: u.Value().useStorage(req.GetName())}) } src, err := peerserver.ResolveBlob(s.templateCache, req.GetBuildId(), req.GetName()) diff --git a/packages/orchestrator/pkg/server/main.go b/packages/orchestrator/pkg/server/main.go index fb53dee8ce..61059241ad 100644 --- a/packages/orchestrator/pkg/server/main.go +++ b/packages/orchestrator/pkg/server/main.go @@ -46,7 +46,7 @@ type Server struct { sbxEventsService *events.EventsService startingSandboxes *semaphore.Weighted peerRegistry peerclient.Registry - uploadedBuilds *ttlcache.Cache[string, struct{}] + uploadedBuilds *ttlcache.Cache[string, uploadedHeaders] uploads *sandbox.Uploads sandboxCreateDuration metric.Int64Histogram } @@ -68,8 +68,9 @@ type ServiceConfig struct { } func New(cfg ServiceConfig) (*Server, error) { - uploadedBuilds := ttlcache.New[string, struct{}]( - ttlcache.WithTTL[string, struct{}](uploadedBuildsTTL), + // TODO - we should not be caching the headers (for 1hr) if P2P FF is off. + uploadedBuilds := ttlcache.New( + ttlcache.WithTTL[string, uploadedHeaders](uploadedBuildsTTL), ) go uploadedBuilds.Start() diff --git a/packages/orchestrator/pkg/server/sandboxes.go b/packages/orchestrator/pkg/server/sandboxes.go index 60966a5f37..fa18375100 100644 --- a/packages/orchestrator/pkg/server/sandboxes.go +++ b/packages/orchestrator/pkg/server/sandboxes.go @@ -790,7 +790,10 @@ func (s *Server) snapshotAndCacheSandbox( return } - s.uploadedBuilds.Set(meta.Template.BuildID, struct{}{}, ttlcache.DefaultTTL) + s.uploadedBuilds.Set(meta.Template.BuildID, uploadedHeaders{ + memfile: upload.MemfileHeader(), + rootfs: upload.RootfsHeader(), + }, ttlcache.DefaultTTL) if err := s.peerRegistry.Unregister(ctx, meta.Template.BuildID); err != nil { logger.L().Warn(ctx, "failed to unregister peer address from routing", zap.String("build_id", meta.Template.BuildID), zap.Error(err)) diff --git a/packages/shared/pkg/grpc/orchestrator/chunks.pb.go b/packages/shared/pkg/grpc/orchestrator/chunks.pb.go index 94372f198f..74e597c76e 100644 --- a/packages/shared/pkg/grpc/orchestrator/chunks.pb.go +++ b/packages/shared/pkg/grpc/orchestrator/chunks.pb.go @@ -34,6 +34,8 @@ type PeerAvailability struct { // caller should switch to reading from remote storage directly instead of // this peer. UseStorage bool `protobuf:"varint,2,opt,name=use_storage,json=useStorage,proto3" json:"use_storage,omitempty"` + // header_bytes carries the post-upload V4 header (framed files only). + HeaderBytes []byte `protobuf:"bytes,3,opt,name=header_bytes,json=headerBytes,proto3" json:"header_bytes,omitempty"` } func (x *PeerAvailability) Reset() { @@ -82,6 +84,13 @@ func (x *PeerAvailability) GetUseStorage() bool { return false } +func (x *PeerAvailability) GetHeaderBytes() []byte { + if x != nil { + return x.HeaderBytes + } + return nil +} + type GetBuildFileSizeRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -538,83 +547,85 @@ func (x *GetBuildBlobResponse) GetAvailability() *PeerAvailability { var File_chunks_proto protoreflect.FileDescriptor var file_chunks_proto_rawDesc = []byte{ - 0x0a, 0x0c, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x58, + 0x0a, 0x0c, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x7b, 0x0a, 0x10, 0x50, 0x65, 0x65, 0x72, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x12, 0x23, 0x0a, 0x0d, 0x6e, 0x6f, 0x74, 0x5f, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x6e, 0x6f, 0x74, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x75, 0x73, 0x65, 0x5f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x75, 0x73, - 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x22, 0x48, 0x0a, 0x17, 0x47, 0x65, 0x74, 0x42, - 0x75, 0x69, 0x6c, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x5f, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x49, 0x64, 0x12, 0x12, - 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, - 0x6d, 0x65, 0x22, 0x70, 0x0a, 0x18, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x46, 0x69, - 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, - 0x0a, 0x0a, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x09, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x35, 0x0a, - 0x0c, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x61, - 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x52, 0x0c, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, - 0x6c, 0x69, 0x74, 0x79, 0x22, 0x4a, 0x0a, 0x19, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, - 0x46, 0x69, 0x6c, 0x65, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, - 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, - 0x22, 0x53, 0x0a, 0x1a, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x46, 0x69, 0x6c, 0x65, - 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x35, - 0x0a, 0x0c, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x41, 0x76, 0x61, 0x69, 0x6c, - 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x52, 0x0c, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, - 0x69, 0x6c, 0x69, 0x74, 0x79, 0x22, 0x7b, 0x0a, 0x1a, 0x52, 0x65, 0x61, 0x64, 0x41, 0x74, 0x42, - 0x75, 0x69, 0x6c, 0x64, 0x53, 0x65, 0x65, 0x6b, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x5f, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x49, 0x64, 0x12, 0x12, - 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, - 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6c, 0x65, - 0x6e, 0x67, 0x74, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6c, 0x65, 0x6e, 0x67, - 0x74, 0x68, 0x22, 0x68, 0x0a, 0x1b, 0x52, 0x65, 0x61, 0x64, 0x41, 0x74, 0x42, 0x75, 0x69, 0x6c, - 0x64, 0x53, 0x65, 0x65, 0x6b, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x35, 0x0a, 0x0c, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, - 0x69, 0x6c, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x50, 0x65, - 0x65, 0x72, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x52, 0x0c, - 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x22, 0x44, 0x0a, 0x13, - 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x5f, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x49, 0x64, 0x12, 0x12, - 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, - 0x6d, 0x65, 0x22, 0x61, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x42, 0x6c, - 0x6f, 0x62, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, - 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x35, - 0x0a, 0x0c, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x41, 0x76, 0x61, 0x69, 0x6c, - 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x52, 0x0c, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, - 0x69, 0x6c, 0x69, 0x74, 0x79, 0x32, 0xb9, 0x02, 0x0a, 0x0c, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x53, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x47, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, - 0x6c, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x18, 0x2e, 0x47, 0x65, 0x74, - 0x42, 0x75, 0x69, 0x6c, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x46, - 0x69, 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x4d, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x45, - 0x78, 0x69, 0x73, 0x74, 0x73, 0x12, 0x1a, 0x2e, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, - 0x46, 0x69, 0x6c, 0x65, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1b, 0x2e, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x46, 0x69, 0x6c, 0x65, - 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x52, - 0x0a, 0x13, 0x52, 0x65, 0x61, 0x64, 0x41, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x53, 0x65, 0x65, - 0x6b, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x1b, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x41, 0x74, 0x42, 0x75, - 0x69, 0x6c, 0x64, 0x53, 0x65, 0x65, 0x6b, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x41, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, - 0x53, 0x65, 0x65, 0x6b, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x30, 0x01, 0x12, 0x3d, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x42, 0x6c, - 0x6f, 0x62, 0x12, 0x14, 0x2e, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x42, 0x6c, 0x6f, - 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x47, 0x65, 0x74, 0x42, 0x75, - 0x69, 0x6c, 0x64, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, - 0x01, 0x42, 0x2f, 0x5a, 0x2d, 0x68, 0x74, 0x74, 0x70, 0x73, 0x3a, 0x2f, 0x2f, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x65, 0x32, 0x62, 0x2d, 0x64, 0x65, 0x76, 0x2f, - 0x69, 0x6e, 0x66, 0x72, 0x61, 0x2f, 0x6f, 0x72, 0x63, 0x68, 0x65, 0x73, 0x74, 0x72, 0x61, 0x74, - 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x68, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, + 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x42, 0x79, 0x74, 0x65, 0x73, 0x22, 0x48, 0x0a, 0x17, 0x47, + 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x5f, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x49, + 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x22, 0x70, 0x0a, 0x18, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, + 0x64, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x53, 0x69, 0x7a, 0x65, + 0x12, 0x35, 0x0a, 0x0c, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x41, 0x76, 0x61, + 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x52, 0x0c, 0x61, 0x76, 0x61, 0x69, 0x6c, + 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x22, 0x4a, 0x0a, 0x19, 0x47, 0x65, 0x74, 0x42, 0x75, + 0x69, 0x6c, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x5f, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x49, 0x64, 0x12, + 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x22, 0x53, 0x0a, 0x1a, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x46, + 0x69, 0x6c, 0x65, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x35, 0x0a, 0x0c, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x41, 0x76, + 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x52, 0x0c, 0x61, 0x76, 0x61, 0x69, + 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x22, 0x7b, 0x0a, 0x1a, 0x52, 0x65, 0x61, 0x64, + 0x41, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x53, 0x65, 0x65, 0x6b, 0x61, 0x62, 0x6c, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x5f, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x49, + 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x16, 0x0a, + 0x06, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6c, + 0x65, 0x6e, 0x67, 0x74, 0x68, 0x22, 0x68, 0x0a, 0x1b, 0x52, 0x65, 0x61, 0x64, 0x41, 0x74, 0x42, + 0x75, 0x69, 0x6c, 0x64, 0x53, 0x65, 0x65, 0x6b, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x35, 0x0a, 0x0c, 0x61, 0x76, 0x61, 0x69, + 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, + 0x2e, 0x50, 0x65, 0x65, 0x72, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, + 0x79, 0x52, 0x0c, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x22, + 0x44, 0x0a, 0x13, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x42, 0x6c, 0x6f, 0x62, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x5f, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x49, + 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x22, 0x61, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, + 0x64, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, + 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, + 0x61, 0x12, 0x35, 0x0a, 0x0c, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, + 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x41, 0x76, + 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x52, 0x0c, 0x61, 0x76, 0x61, 0x69, + 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x32, 0xb9, 0x02, 0x0a, 0x0c, 0x43, 0x68, 0x75, + 0x6e, 0x6b, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x47, 0x0a, 0x10, 0x47, 0x65, 0x74, + 0x42, 0x75, 0x69, 0x6c, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x18, 0x2e, + 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, + 0x6c, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x4d, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x46, 0x69, + 0x6c, 0x65, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x12, 0x1a, 0x2e, 0x47, 0x65, 0x74, 0x42, 0x75, + 0x69, 0x6c, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x46, + 0x69, 0x6c, 0x65, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x52, 0x0a, 0x13, 0x52, 0x65, 0x61, 0x64, 0x41, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, + 0x53, 0x65, 0x65, 0x6b, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x1b, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x41, + 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x53, 0x65, 0x65, 0x6b, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x41, 0x74, 0x42, 0x75, + 0x69, 0x6c, 0x64, 0x53, 0x65, 0x65, 0x6b, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x3d, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, + 0x64, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x14, 0x2e, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, + 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x47, 0x65, + 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x30, 0x01, 0x42, 0x2f, 0x5a, 0x2d, 0x68, 0x74, 0x74, 0x70, 0x73, 0x3a, 0x2f, 0x2f, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x65, 0x32, 0x62, 0x2d, 0x64, + 0x65, 0x76, 0x2f, 0x69, 0x6e, 0x66, 0x72, 0x61, 0x2f, 0x6f, 0x72, 0x63, 0x68, 0x65, 0x73, 0x74, + 0x72, 0x61, 0x74, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/packages/shared/pkg/storage/header/serialization.go b/packages/shared/pkg/storage/header/serialization.go index c9c86fe0b9..0f3036bc15 100644 --- a/packages/shared/pkg/storage/header/serialization.go +++ b/packages/shared/pkg/storage/header/serialization.go @@ -56,25 +56,31 @@ func LoadHeader(ctx context.Context, s storage.StorageProvider, path string) (*H return DeserializeBytes(data) } -// StoreHeader serializes a header and uploads it to long-term storage. -// Refuses to persist a header still flagged as in-flight — the upload pipeline -// must clear IncompletePendingUpload before reaching here. -func StoreHeader(ctx context.Context, s storage.StorageProvider, path string, h *Header) error { +// StoreHeader serializes a header and uploads it to long-term storage, also +// returning the serialized bytes so the caller can reuse them in-process +// (e.g. for inline delivery to peer clients) without re-serializing. Refuses +// to persist a header still flagged as in-flight — the upload pipeline must +// clear IncompletePendingUpload before reaching here. +func StoreHeader(ctx context.Context, s storage.StorageProvider, path string, h *Header) ([]byte, error) { if h.IncompletePendingUpload { - return fmt.Errorf("refusing to persist incomplete header for %s", path) + return nil, fmt.Errorf("refusing to persist incomplete header for %s", path) } data, err := SerializeHeader(h) if err != nil { - return fmt.Errorf("serialize header: %w", err) + return nil, fmt.Errorf("serialize header: %w", err) } blob, err := s.OpenBlob(ctx, path, storage.MetadataObjectType) if err != nil { - return fmt.Errorf("open blob %s: %w", path, err) + return nil, fmt.Errorf("open blob %s: %w", path, err) + } + + if err := blob.Put(ctx, data); err != nil { + return nil, err } - return blob.Put(ctx, data) + return data, nil } // Deserialize reads a header from a storage Blob (legacy API). diff --git a/packages/shared/pkg/storage/storage.go b/packages/shared/pkg/storage/storage.go index 27a77e78ac..474d62f370 100644 --- a/packages/shared/pkg/storage/storage.go +++ b/packages/shared/pkg/storage/storage.go @@ -167,14 +167,9 @@ func UploadBlob(ctx context.Context, provider StorageProvider, remotePath string return blob.Put(ctx, data, opts...) } -// PeerTransitionedError is returned by the peer Seekable when the remote -// storage upload has completed; the caller should re-load the V4 header from -// storage. -type PeerTransitionedError struct{} - -func (e *PeerTransitionedError) Error() string { - return "peer upload completed, reload header from storage" -} +// ErrPeerAborted signals the peer aborted an in-flight stream; the caller +// should close and re-issue OpenRangeReader (next attempt hits base). +var ErrPeerAborted = errors.New("peer aborted stream mid-flight") // StorageConfig holds the configuration for creating a storage provider. // Both GetLocalBasePath and GetBucketName are evaluated lazily so that From e8aa197f5720b59bbed9267e5ccddfc9f70cfcc3 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 11 May 2026 12:06:05 -0700 Subject: [PATCH 2/3] PR feedback: log header deserialize failures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit checkPeerAvailability silently dropped errors from header.DeserializeBytes on peer-delivered V4 header bytes. Now logs loudly via logger.L().Error with file name + error. The transition to storage still proceeds — File keeps its existing header rather than installing a corrupted one, matching the V3 fallback path. Gating uploaded.Store(true) on parse success (as the bot suggested) would re-route the next read through the same peer and loop on the same bad bytes; logging surfaces the corruption without that risk. --- packages/orchestrator/pkg/sandbox/build/build.go | 15 +++++---------- .../pkg/sandbox/template/peerclient/blob.go | 6 +++--- .../pkg/sandbox/template/peerclient/seekable.go | 6 +++--- .../pkg/sandbox/template/peerclient/storage.go | 10 ++++++++-- 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/packages/orchestrator/pkg/sandbox/build/build.go b/packages/orchestrator/pkg/sandbox/build/build.go index 6565b8c3b9..faf230bb7c 100644 --- a/packages/orchestrator/pkg/sandbox/build/build.go +++ b/packages/orchestrator/pkg/sandbox/build/build.go @@ -57,8 +57,7 @@ func (b *File) SwapHeader(h *header.Header) { b.header.Store(h) } -func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (int, error) { - var n int +func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) { for n < len(p) { h := b.installPendingHeader() @@ -144,12 +143,9 @@ func (b *File) Slice(ctx context.Context, off, _ int64) ([]byte, error) { return diff.Slice(ctx, int64(mappedBuild.Offset), int64(h.Metadata.BlockSize), ft) } -// installPendingHeader installs a header the peer delivered via UseStorage -// and returns the current header — either the freshly-installed one or the -// pre-existing one if no install was needed. Idempotent: concurrent readers -// all CAS the same value. Skips the CAS once the pending header is already -// installed (pointer equality), keeping the per-iteration cost to one atomic -// Load on the steady-state path. +// installPendingHeader CAS-installs a peer-delivered post-upload header if +// one is pending and returns the current header. Steady-state cost is one +// atomic Load (pointer equality short-circuits the CAS). func (b *File) installPendingHeader() *header.Header { cur := b.header.Load() if b.headers == nil { @@ -162,8 +158,7 @@ func (b *File) installPendingHeader() *header.Header { if b.header.CompareAndSwap(cur, h) { return h } - // Lost the CAS — an external SwapHeader landed concurrently. Re-load to - // surface whatever's now authoritative. + return b.header.Load() } diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/blob.go b/packages/orchestrator/pkg/sandbox/template/peerclient/blob.go index 76980a7a17..df60636aab 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/blob.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/blob.go @@ -102,7 +102,7 @@ func (b *peerBlob) Exists(ctx context.Context) (bool, error) { return peerAttempt[bool]{}, err } - outcome := checkPeerAvailability(resp.GetAvailability(), b.state, b.name) + outcome := checkPeerAvailability(ctx, resp.GetAvailability(), b.state, b.name) if outcome != served { return peerAttempt[bool]{result: outcome}, nil } @@ -151,7 +151,7 @@ func openPeerBlobStream( return nil, 0, fmt.Errorf("recv first blob message: %w", err) } - if outcome := checkPeerAvailability(msg.GetAvailability(), state, req.GetName()); outcome != served { + if outcome := checkPeerAvailability(ctx, msg.GetAvailability(), state, req.GetName()); outcome != served { return nil, outcome, nil } @@ -170,7 +170,7 @@ func openPeerBlobStream( return nil, err } - _ = checkPeerAvailability(m.GetAvailability(), state, req.GetName()) + _ = checkPeerAvailability(ctx, m.GetAvailability(), state, req.GetName()) return m.GetData(), nil }, served, nil diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/seekable.go b/packages/orchestrator/pkg/sandbox/template/peerclient/seekable.go index 2e4d97c6e4..eb09b0d000 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/seekable.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/seekable.go @@ -69,7 +69,7 @@ func (s *peerSeekable) Size(ctx context.Context) (int64, error) { return peerAttempt[int64]{}, err } - outcome := checkPeerAvailability(resp.GetAvailability(), s.state, s.name) + outcome := checkPeerAvailability(ctx, resp.GetAvailability(), s.state, s.name) if outcome != served { return peerAttempt[int64]{result: outcome}, nil } @@ -162,7 +162,7 @@ func openPeerSeekableStream( return nil, 0, fmt.Errorf("recv first seekable message: %w", err) } - if outcome := checkPeerAvailability(msg.GetAvailability(), state, req.GetName()); outcome != served { + if outcome := checkPeerAvailability(ctx, msg.GetAvailability(), state, req.GetName()); outcome != served { return nil, outcome, nil } @@ -181,7 +181,7 @@ func openPeerSeekableStream( return nil, err } - if checkPeerAvailability(m.GetAvailability(), state, req.GetName()) != served { + if checkPeerAvailability(ctx, m.GetAvailability(), state, req.GetName()) != served { return nil, storage.ErrPeerAborted } diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go b/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go index a57d3e76fa..913a83daf1 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/storage.go @@ -11,8 +11,10 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" + "github.com/e2b-dev/infra/packages/shared/pkg/logger" "github.com/e2b-dev/infra/packages/shared/pkg/storage" "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" @@ -188,13 +190,17 @@ func (p *peerStorageProvider) GetDetails() string { // checkPeerAvailability classifies a peer availability message. On UseStorage // it flips state.uploaded and (if header bytes are present) stashes the -// parsed V4 header for File to install. Parse errors are dropped silently. -func checkPeerAvailability(avail *orchestrator.PeerAvailability, state *peerState, name string) result { +// parsed V4 header for File to install. Parse failures are logged and the +// transition still proceeds — the File keeps its existing header rather +// than installing a corrupted one, matching the V3 fallback path. +func checkPeerAvailability(ctx context.Context, avail *orchestrator.PeerAvailability, state *peerState, name string) result { if avail.GetUseStorage() { if state != nil { if bytes := avail.GetHeaderBytes(); len(bytes) > 0 { if h, err := header.DeserializeBytes(bytes); err == nil { state.setHeader(name, h) + } else { + logger.L().Error(ctx, "peer header deserialize failed", zap.String("file_name", name), zap.Error(err)) } } state.uploaded.Store(true) From ea5233a8cc397f78bdd86d1504888ff5120cfaae Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 11 May 2026 12:27:06 -0700 Subject: [PATCH 3/3] restore comments dropped gratuitously --- .../orchestrator/pkg/sandbox/template/peerclient/blob.go | 4 +++- .../pkg/sandbox/template/peerclient/resolver.go | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/blob.go b/packages/orchestrator/pkg/sandbox/template/peerclient/blob.go index df60636aab..22ec1b84c4 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/blob.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/blob.go @@ -133,7 +133,6 @@ func (b *peerBlob) Put(ctx context.Context, data []byte, opts ...storage.PutOpti // openPeerBlobStream opens a GetBuildBlob stream, checks peer availability, // and returns a recv function that yields data chunks starting with the first message's data. -// Mid-stream availability changes are side-effect only; the in-flight stream completes from peer. // The passed context HAS to be canceled by the caller when done with the stream to avoid leaks. func openPeerBlobStream( ctx context.Context, @@ -170,6 +169,9 @@ func openPeerBlobStream( return nil, err } + // Flip the uploaded flag if the peer signals use_storage; the current + // stream keeps reading from the peer, but subsequent operations will + // go directly to GCS. _ = checkPeerAvailability(ctx, m.GetAvailability(), state, req.GetName()) return m.GetData(), nil diff --git a/packages/orchestrator/pkg/sandbox/template/peerclient/resolver.go b/packages/orchestrator/pkg/sandbox/template/peerclient/resolver.go index f99c11fb6a..6edcbecfa0 100644 --- a/packages/orchestrator/pkg/sandbox/template/peerclient/resolver.go +++ b/packages/orchestrator/pkg/sandbox/template/peerclient/resolver.go @@ -158,14 +158,14 @@ func (r *peerResolver) peerState(buildID string) *peerState { return actual.(*peerState) } -// Purge removes the per-build state, called on template cache eviction so -// the entry doesn't accumulate forever. +// Purge removes the uploaded state for a build, called on template +// cache eviction so the entry doesn't accumulate forever. func (r *peerResolver) Purge(buildID string) { r.uploadedBuilds.Delete(buildID) } // resolve looks up the peer for the given build and returns a gRPC client if -// a remote peer is found. Returns a zero result when the base provider should +// a remote peer is found. Returns a nil client when the base provider should // be used instead (uploaded, no peer, self, or error). func (r *peerResolver) resolve(ctx context.Context, buildID string) (attribute.KeyValue, resolveResult) { // Fast path: a prior resolve flagged this build as peer-served and a