Skip to content

Commit 6d217f9

Browse files
authored
fix(peerclient): resolve base path per-call from live FrameTable (#2585)
After a peer→storage transition, peerSeekable kept opening base against the original (uncompressed) path captured at construction. With the recent compression work, post-transition reads now target a compressed object, so the cached base resolved to a non-existent path. P2P routing produced the stale binding (it captured a path while the build was still uncompressed); it also contains the fix. peerSeekable now holds (buildID, basic name, base provider, objType) and composes the actual storage path at base-open time using the CompressionType from the live FrameTable. The base seekable is reopened on ct change. No changes outside peerclient.
1 parent a67f983 commit 6d217f9

10 files changed

Lines changed: 479 additions & 318 deletions

File tree

packages/orchestrator/chunks.proto

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ message PeerAvailability {
2020

2121
message GetBuildFileSizeRequest {
2222
string build_id = 1;
23-
// file_name is one of the seekable diff files: "memfile", "rootfs.ext4"
24-
string file_name = 2;
23+
// name is one of the seekable diff files: "memfile", "rootfs.ext4"
24+
string name = 2;
2525
}
2626

2727
message GetBuildFileSizeResponse {
@@ -31,8 +31,8 @@ message GetBuildFileSizeResponse {
3131

3232
message GetBuildFileExistsRequest {
3333
string build_id = 1;
34-
// file_name is one of: "snapfile", "metadata.json"
35-
string file_name = 2;
34+
// name is one of: "snapfile", "metadata.json"
35+
string name = 2;
3636
}
3737

3838
message GetBuildFileExistsResponse {
@@ -41,7 +41,7 @@ message GetBuildFileExistsResponse {
4141

4242
message ReadAtBuildSeekableRequest {
4343
string build_id = 1;
44-
string file_name = 2;
44+
string name = 2;
4545
int64 offset = 3;
4646
int64 length = 4;
4747
}
@@ -54,8 +54,8 @@ message ReadAtBuildSeekableResponse {
5454

5555
message GetBuildBlobRequest {
5656
string build_id = 1;
57-
// file_name is one of: "snapfile", "metadata.json", "memfile.header", "rootfs.ext4.header"
58-
string file_name = 2;
57+
// name is one of: "snapfile", "metadata.json", "memfile.header", "rootfs.ext4.header"
58+
string name = 2;
5959
}
6060

6161
message GetBuildBlobResponse {

packages/orchestrator/pkg/sandbox/template/peerclient/blob.go

Lines changed: 61 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"io"
8+
"sync"
89
"sync/atomic"
910

1011
"go.uber.org/zap"
@@ -16,22 +17,49 @@ import (
1617

1718
var _ storage.Blob = (*peerBlob)(nil)
1819

20+
// peerBlob reads from the peer first; on fallthrough, opens base lazily.
21+
// The base path is fixed at construction (blobs are not compressed).
1922
type peerBlob struct {
20-
peerHandle[storage.Blob]
23+
peerHandle
24+
25+
openBase func(ctx context.Context) (storage.Blob, error)
26+
27+
mu sync.Mutex
28+
base storage.Blob
29+
loaded bool
30+
}
31+
32+
func (b *peerBlob) getBase(ctx context.Context) (storage.Blob, error) {
33+
b.mu.Lock()
34+
defer b.mu.Unlock()
35+
36+
if b.loaded {
37+
return b.base, nil
38+
}
39+
40+
base, err := b.openBase(ctx)
41+
if err != nil {
42+
return nil, err
43+
}
44+
45+
b.base = base
46+
b.loaded = true
47+
48+
return base, nil
2149
}
2250

2351
func (b *peerBlob) WriteTo(ctx context.Context, dst io.Writer) (int64, error) {
24-
return withPeerFallback(ctx, &b.peerHandle, "peer-blob-write-to", attrOpWriteTo,
52+
res, err := tryPeer(ctx, &b.peerHandle, "peer-blob-write-to", attrOpWriteTo,
2553
func(ctx context.Context) (peerAttempt[int64], error) {
2654
streamCtx, cancel := context.WithCancel(ctx)
2755

2856
recv, err := openPeerBlobStream(streamCtx, b.client, &orchestrator.GetBuildBlobRequest{
29-
BuildId: b.buildID,
30-
FileName: b.fileName,
57+
BuildId: b.buildID,
58+
Name: b.name,
3159
}, b.uploaded)
3260
if err != nil {
3361
cancel()
34-
logger.L().Warn(ctx, "failed to open peer blob stream", logger.WithBuildID(b.buildID), zap.String("file_name", b.fileName), zap.Error(err))
62+
logger.L().Warn(ctx, "failed to open peer blob stream", logger.WithBuildID(b.buildID), zap.String("file_name", b.name), zap.Error(err))
3563

3664
return peerAttempt[int64]{}, nil
3765
}
@@ -42,43 +70,55 @@ func (b *peerBlob) WriteTo(ctx context.Context, dst io.Writer) (int64, error) {
4270
n, err := io.Copy(dst, reader)
4371
if err != nil {
4472
return peerAttempt[int64]{value: n, bytes: n, hit: true},
45-
fmt.Errorf("failed to stream file %q from peer: %w", b.fileName, err)
73+
fmt.Errorf("failed to stream file %q from peer: %w", b.name, err)
4674
}
4775

4876
return peerAttempt[int64]{value: n, bytes: n, hit: true}, nil
49-
},
50-
func(ctx context.Context, base storage.Blob) (int64, error) {
51-
return base.WriteTo(ctx, dst)
52-
},
53-
)
77+
})
78+
if res.hit {
79+
return res.value, err
80+
}
81+
82+
base, err := b.getBase(ctx)
83+
if err != nil {
84+
return 0, err
85+
}
86+
87+
return base.WriteTo(ctx, dst)
5488
}
5589

5690
func (b *peerBlob) Exists(ctx context.Context) (bool, error) {
57-
return withPeerFallback(ctx, &b.peerHandle, "peer-blob-exists", attrOpExists,
91+
res, err := tryPeer(ctx, &b.peerHandle, "peer-blob-exists", attrOpExists,
5892
func(ctx context.Context) (peerAttempt[bool], error) {
5993
resp, err := b.client.GetBuildFileExists(ctx, &orchestrator.GetBuildFileExistsRequest{
60-
BuildId: b.buildID,
61-
FileName: b.fileName,
94+
BuildId: b.buildID,
95+
Name: b.name,
6296
})
6397
if err == nil && checkPeerAvailability(resp.GetAvailability(), b.uploaded) {
6498
return peerAttempt[bool]{value: true, hit: true}, nil
6599
}
66100

67101
if err != nil {
68-
logger.L().Warn(ctx, "failed to check build file exists from peer", logger.WithBuildID(b.buildID), zap.String("file_name", b.fileName), zap.Error(err))
102+
logger.L().Warn(ctx, "failed to check build file exists from peer", logger.WithBuildID(b.buildID), zap.String("file_name", b.name), zap.Error(err))
69103
}
70104

71105
return peerAttempt[bool]{}, nil
72-
},
73-
func(ctx context.Context, base storage.Blob) (bool, error) {
74-
return base.Exists(ctx)
75-
},
76-
)
106+
})
107+
if res.hit {
108+
return res.value, err
109+
}
110+
111+
base, err := b.getBase(ctx)
112+
if err != nil {
113+
return false, err
114+
}
115+
116+
return base.Exists(ctx)
77117
}
78118

79119
func (b *peerBlob) Put(ctx context.Context, data []byte, opts ...storage.PutOption) error {
80120
// Writes always go to the base provider (GCS/S3); the peer is read-only.
81-
fallback, err := b.getOrOpenBase(ctx)
121+
fallback, err := b.getBase(ctx)
82122
if err != nil {
83123
return err
84124
}

packages/orchestrator/pkg/sandbox/template/peerclient/blob_test.go

Lines changed: 50 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ func TestPeerBlob_WriteTo_PeerSucceeds(t *testing.T) {
2727

2828
client := orchestratormocks.NewMockChunkServiceClient(t)
2929
client.EXPECT().GetBuildBlob(mock.Anything, mock.MatchedBy(func(req *orchestrator.GetBuildBlobRequest) bool {
30-
return req.GetBuildId() == "build-1" && req.GetFileName() == "snapfile"
30+
return req.GetBuildId() == "build-1" && req.GetName() == "snapfile"
3131
})).Return(stream, nil)
3232

33-
blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{
33+
blob := &peerBlob{peerHandle: peerHandle{
3434
client: client,
3535
buildID: "build-1",
36-
fileName: "snapfile",
36+
name: "snapfile",
3737
uploaded: &atomic.Bool{},
3838
}}
3939

@@ -62,15 +62,17 @@ func TestPeerBlob_WriteTo_PeerNotAvailable_FallsBackToBase(t *testing.T) {
6262
base := storage.NewMockStorageProvider(t)
6363
base.EXPECT().OpenBlob(mock.Anything, "build-1/snapfile", storage.SnapfileObjectType).Return(baseBlob, nil)
6464

65-
blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{
66-
client: client,
67-
buildID: "build-1",
68-
fileName: "snapfile",
69-
uploaded: &atomic.Bool{},
70-
openFn: func(ctx context.Context) (storage.Blob, error) {
65+
blob := &peerBlob{
66+
peerHandle: peerHandle{
67+
client: client,
68+
buildID: "build-1",
69+
name: "snapfile",
70+
uploaded: &atomic.Bool{},
71+
},
72+
openBase: func(ctx context.Context) (storage.Blob, error) {
7173
return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType)
7274
},
73-
}}
75+
}
7476

7577
var buf bytes.Buffer
7678
n, err := blob.WriteTo(t.Context(), &buf)
@@ -94,15 +96,17 @@ func TestPeerBlob_WriteTo_PeerError_FallsBackToBase(t *testing.T) {
9496
base := storage.NewMockStorageProvider(t)
9597
base.EXPECT().OpenBlob(mock.Anything, "build-1/snapfile", storage.SnapfileObjectType).Return(baseBlob, nil)
9698

97-
blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{
98-
client: client,
99-
buildID: "build-1",
100-
fileName: "snapfile",
101-
uploaded: &atomic.Bool{},
102-
openFn: func(ctx context.Context) (storage.Blob, error) {
99+
blob := &peerBlob{
100+
peerHandle: peerHandle{
101+
client: client,
102+
buildID: "build-1",
103+
name: "snapfile",
104+
uploaded: &atomic.Bool{},
105+
},
106+
openBase: func(ctx context.Context) (storage.Blob, error) {
103107
return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType)
104108
},
105-
}}
109+
}
106110

107111
var buf bytes.Buffer
108112
_, err := blob.WriteTo(t.Context(), &buf)
@@ -139,15 +143,17 @@ func TestPeerBlob_WriteTo_UploadedSetMidStream_CompletesFromPeerThenFallsBack(t
139143
base := storage.NewMockStorageProvider(t)
140144
base.EXPECT().OpenBlob(mock.Anything, "build-1/snapfile", storage.SnapfileObjectType).Return(baseBlob, nil)
141145

142-
blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{
143-
client: client,
144-
buildID: "build-1",
145-
fileName: "snapfile",
146-
uploaded: uploaded,
147-
openFn: func(ctx context.Context) (storage.Blob, error) {
146+
blob := &peerBlob{
147+
peerHandle: peerHandle{
148+
client: client,
149+
buildID: "build-1",
150+
name: "snapfile",
151+
uploaded: uploaded,
152+
},
153+
openBase: func(ctx context.Context) (storage.Blob, error) {
148154
return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType)
149155
},
150-
}}
156+
}
151157

152158
// First download: in-flight stream completes from peer despite uploaded being set mid-stream.
153159
var buf1 bytes.Buffer
@@ -170,10 +176,10 @@ func TestPeerBlob_Exists_PeerHasFile(t *testing.T) {
170176

171177
client := orchestratormocks.NewMockChunkServiceClient(t)
172178
client.EXPECT().GetBuildFileExists(mock.Anything, mock.MatchedBy(func(req *orchestrator.GetBuildFileExistsRequest) bool {
173-
return req.GetBuildId() == "build-1" && req.GetFileName() == "snapfile"
179+
return req.GetBuildId() == "build-1" && req.GetName() == "snapfile"
174180
})).Return(&orchestrator.GetBuildFileExistsResponse{}, nil)
175181

176-
blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{client: client, buildID: "build-1", fileName: "snapfile", uploaded: &atomic.Bool{}}}
182+
blob := &peerBlob{peerHandle: peerHandle{client: client, buildID: "build-1", name: "snapfile", uploaded: &atomic.Bool{}}}
177183
ok, err := blob.Exists(t.Context())
178184
require.NoError(t, err)
179185
assert.True(t, ok)
@@ -190,15 +196,17 @@ func TestPeerBlob_Exists_PeerNotAvailable_FallsBackToBase(t *testing.T) {
190196
base := storage.NewMockStorageProvider(t)
191197
base.EXPECT().OpenBlob(mock.Anything, "build-1/snapfile", storage.SnapfileObjectType).Return(baseBlob, nil)
192198

193-
blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{
194-
client: client,
195-
buildID: "build-1",
196-
fileName: "snapfile",
197-
uploaded: &atomic.Bool{},
198-
openFn: func(ctx context.Context) (storage.Blob, error) {
199+
blob := &peerBlob{
200+
peerHandle: peerHandle{
201+
client: client,
202+
buildID: "build-1",
203+
name: "snapfile",
204+
uploaded: &atomic.Bool{},
205+
},
206+
openBase: func(ctx context.Context) (storage.Blob, error) {
199207
return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType)
200208
},
201-
}}
209+
}
202210

203211
ok, err := blob.Exists(t.Context())
204212
require.NoError(t, err)
@@ -217,15 +225,17 @@ func TestPeerBlob_Exists_UseStorage_FallsBackToBase(t *testing.T) {
217225
base.EXPECT().OpenBlob(mock.Anything, "build-1/snapfile", storage.SnapfileObjectType).Return(baseBlob, nil)
218226

219227
uploaded := &atomic.Bool{}
220-
blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{
221-
client: client,
222-
buildID: "build-1",
223-
fileName: "snapfile",
224-
uploaded: uploaded,
225-
openFn: func(ctx context.Context) (storage.Blob, error) {
228+
blob := &peerBlob{
229+
peerHandle: peerHandle{
230+
client: client,
231+
buildID: "build-1",
232+
name: "snapfile",
233+
uploaded: uploaded,
234+
},
235+
openBase: func(ctx context.Context) (storage.Blob, error) {
226236
return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType)
227237
},
228-
}}
238+
}
229239

230240
ok, err := blob.Exists(t.Context())
231241
require.NoError(t, err)

0 commit comments

Comments
 (0)