Skip to content

Commit 7e40e63

Browse files
committed
go/worker/storage: Integrate new p2p protocols
Legacy storage sync protocol is still advertised and served to enable seamless rolling upgrades of the network.
1 parent fc405de commit 7e40e63

8 files changed

Lines changed: 145 additions & 39 deletions

File tree

.changelog/5751.feature.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
go: Split storage sync p2p protocol
2+
3+
Storage sync protocol was split into two independent protocols (checkpoint
4+
and diff sync).
5+
6+
This change was made since there may be fewer nodes that expose checkpoints
7+
than storage diff. Previously, this could lead to issues with state sync
8+
when a node was connected with peers that supported storage sync protocol
9+
but had no checkpoints available.
10+
11+
This was done in backwards compatible manner, so that both protocols are still
12+
advertised and used. Eventually, we plan to remove legacy protocol.

go/oasis-node/cmd/debug/byzantine/node.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api"
2323
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
2424
"github.com/oasisprotocol/oasis-core/go/worker/client"
25-
storageP2P "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
25+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
2626
)
2727

2828
type byzantine struct {
@@ -154,7 +154,7 @@ func initializeAndRegisterByzantineNode(
154154
if err != nil {
155155
return nil, fmt.Errorf("initializing storage node failed: %w", err)
156156
}
157-
b.p2p.service.RegisterProtocolServer(storageP2P.NewServer(b.chainContext, b.runtimeID, storage))
157+
b.p2p.service.RegisterProtocolServer(synclegacy.NewServer(b.chainContext, b.runtimeID, storage))
158158
b.storage = storage
159159

160160
// Wait for activation epoch.

go/worker/storage/committee/checkpoint_sync.go

Lines changed: 80 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import (
1111
"sync"
1212
"time"
1313

14+
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
1415
storageApi "github.com/oasisprotocol/oasis-core/go/storage/api"
1516
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
16-
storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
17+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
18+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
1719
)
1820

1921
const (
@@ -55,7 +57,7 @@ type chunk struct {
5557
*checkpoint.ChunkMetadata
5658

5759
// checkpoint points to the checkpoint this chunk originated from.
58-
checkpoint *storageSync.Checkpoint
60+
checkpoint *checkpointsync.Checkpoint
5961
}
6062

6163
type chunkHeap struct {
@@ -101,12 +103,7 @@ func (n *Node) checkpointChunkFetcher(
101103
defer cancel()
102104

103105
// Fetch chunk from peers.
104-
rsp, pf, err := n.storageSync.GetCheckpointChunk(chunkCtx, &storageSync.GetCheckpointChunkRequest{
105-
Version: chunk.Version,
106-
Root: chunk.Root,
107-
Index: chunk.Index,
108-
Digest: chunk.Digest,
109-
}, chunk.checkpoint)
106+
rsp, pf, err := n.fetchChunk(chunkCtx, chunk)
110107
if err != nil {
111108
n.logger.Error("failed to fetch chunk from peers",
112109
"err", err,
@@ -117,7 +114,7 @@ func (n *Node) checkpointChunkFetcher(
117114
}
118115

119116
// Restore fetched chunk.
120-
done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp.Chunk))
117+
done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp))
121118
cancel()
122119

123120
switch {
@@ -157,7 +154,47 @@ func (n *Node) checkpointChunkFetcher(
157154
}
158155
}
159156

160-
func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) {
157+
// fetchChunk fetches chunk using checkpoint sync p2p protocol client.
158+
//
159+
// In case of no peers or error, it fallbacks to the legacy storage sync protocol.
160+
func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) {
161+
rsp1, pf, err := n.checkpointSync.GetCheckpointChunk(
162+
ctx,
163+
&checkpointsync.GetCheckpointChunkRequest{
164+
Version: chunk.Version,
165+
Root: chunk.Root,
166+
Index: chunk.Index,
167+
Digest: chunk.Digest,
168+
},
169+
&checkpointsync.Checkpoint{
170+
Metadata: chunk.checkpoint.Metadata,
171+
Peers: chunk.checkpoint.Peers,
172+
},
173+
)
174+
if err == nil { // if NO error
175+
return rsp1.Chunk, pf, nil
176+
}
177+
178+
rsp2, pf, err := n.legacyStorageSync.GetCheckpointChunk(
179+
ctx,
180+
&synclegacy.GetCheckpointChunkRequest{
181+
Version: chunk.Version,
182+
Root: chunk.Root,
183+
Index: chunk.Index,
184+
Digest: chunk.Digest,
185+
},
186+
&synclegacy.Checkpoint{
187+
Metadata: chunk.checkpoint.Metadata,
188+
Peers: chunk.checkpoint.Peers,
189+
},
190+
)
191+
if err != nil {
192+
return nil, nil, err
193+
}
194+
return rsp2.Chunk, pf, nil
195+
}
196+
197+
func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) {
161198
if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil {
162199
// Any previous restores were already aborted by the driver up the call stack, so
163200
// things should have been going smoothly here; bail.
@@ -276,13 +313,11 @@ func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelReques
276313
}
277314
}
278315

279-
func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) {
316+
func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) {
280317
ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout)
281318
defer cancel()
282319

283-
list, err := n.storageSync.GetCheckpoints(ctx, &storageSync.GetCheckpointsRequest{
284-
Version: 1,
285-
})
320+
list, err := n.fetchCheckpoints(ctx)
286321
if err != nil {
287322
n.logger.Error("failed to retrieve any checkpoints",
288323
"err", err,
@@ -296,9 +331,36 @@ func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) {
296331
return list, nil
297332
}
298333

334+
// fetchCheckpoints fetches checkpoints using checkpoint sync p2p protocol client.
335+
//
336+
// In case of no peers, error or no checkpoints, it fallbacks to the legacy storage sync protocol.
337+
func (n *Node) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) {
338+
list1, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{
339+
Version: 1,
340+
})
341+
if err == nil && len(list1) > 0 { // if NO error and at least one checkpoint
342+
return list1, nil
343+
}
344+
345+
list2, err := n.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{
346+
Version: 1,
347+
})
348+
if err != nil {
349+
return nil, err
350+
}
351+
var cps []*checkpointsync.Checkpoint
352+
for _, cp := range list2 {
353+
cps = append(cps, &checkpointsync.Checkpoint{
354+
Metadata: cp.Metadata,
355+
Peers: cp.Peers,
356+
})
357+
}
358+
return cps, nil
359+
}
360+
299361
// sortCheckpoints sorts the slice in-place (descending by version, peers, hash).
300-
func sortCheckpoints(s []*storageSync.Checkpoint) {
301-
slices.SortFunc(s, func(a, b *storageSync.Checkpoint) int {
362+
func sortCheckpoints(s []*checkpointsync.Checkpoint) {
363+
slices.SortFunc(s, func(a, b *checkpointsync.Checkpoint) int {
302364
return cmp.Or(
303365
cmp.Compare(b.Root.Version, a.Root.Version),
304366
cmp.Compare(len(b.Peers), len(a.Peers)),
@@ -307,7 +369,7 @@ func sortCheckpoints(s []*storageSync.Checkpoint) {
307369
})
308370
}
309371

310-
func (n *Node) checkCheckpointUsable(cp *storageSync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool {
372+
func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool {
311373
namespace := n.commonNode.Runtime.ID()
312374
if !namespace.Equal(&cp.Root.Namespace) {
313375
// Not for the right runtime.
@@ -357,7 +419,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc
357419

358420
// If we only want the genesis checkpoint, filter it out.
359421
if wantOnlyGenesis && len(cps) > 0 {
360-
var filteredCps []*storageSync.Checkpoint
422+
var filteredCps []*checkpointsync.Checkpoint
361423
for _, cp := range cps {
362424
if cp.Root.Version == genesisRound {
363425
filteredCps = append(filteredCps, cp)

go/worker/storage/committee/checkpoint_sync_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,35 +8,35 @@ import (
88
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
99
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
1010
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/node"
11-
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
11+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
1212
)
1313

1414
func TestSortCheckpoints(t *testing.T) {
15-
cp1 := &sync.Checkpoint{
15+
cp1 := &checkpointsync.Checkpoint{
1616
Metadata: &checkpoint.Metadata{
1717
Root: node.Root{
1818
Version: 2,
1919
},
2020
},
2121
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()},
2222
}
23-
cp2 := &sync.Checkpoint{
23+
cp2 := &checkpointsync.Checkpoint{
2424
Metadata: &checkpoint.Metadata{
2525
Root: node.Root{
2626
Version: 2,
2727
},
2828
},
2929
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()},
3030
}
31-
cp3 := &sync.Checkpoint{
31+
cp3 := &checkpointsync.Checkpoint{
3232
Metadata: &checkpoint.Metadata{
3333
Root: node.Root{
3434
Version: 1,
3535
},
3636
},
3737
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()},
3838
}
39-
cp4 := &sync.Checkpoint{
39+
cp4 := &checkpointsync.Checkpoint{
4040
Metadata: &checkpoint.Metadata{
4141
Root: node.Root{
4242
Version: 1,
@@ -45,9 +45,9 @@ func TestSortCheckpoints(t *testing.T) {
4545
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()},
4646
}
4747

48-
s := []*sync.Checkpoint{cp2, cp3, cp4, cp1}
48+
s := []*checkpointsync.Checkpoint{cp2, cp3, cp4, cp1}
4949

5050
sortCheckpoints(s)
5151

52-
assert.Equal(t, s, []*sync.Checkpoint{cp1, cp2, cp3, cp4})
52+
assert.Equal(t, s, []*checkpointsync.Checkpoint{cp1, cp2, cp3, cp4})
5353
}

go/worker/storage/committee/node.go

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ import (
3232
"github.com/oasisprotocol/oasis-core/go/worker/common/committee"
3333
"github.com/oasisprotocol/oasis-core/go/worker/registration"
3434
"github.com/oasisprotocol/oasis-core/go/worker/storage/api"
35+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
36+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync"
3537
storagePub "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub"
36-
storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
38+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
3739
)
3840

3941
var (
@@ -128,7 +130,9 @@ type Node struct { // nolint: maligned
128130

129131
localStorage storageApi.LocalBackend
130132

131-
storageSync storageSync.Client
133+
diffSync diffsync.Client
134+
checkpointSync checkpointsync.Client
135+
legacyStorageSync synclegacy.Client
132136

133137
undefinedRound uint64
134138

@@ -272,15 +276,21 @@ func NewNode(
272276
node: n,
273277
})
274278

275-
// Register storage sync service.
276-
commonNode.P2P.RegisterProtocolServer(storageSync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
277-
n.storageSync = storageSync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())
278-
279-
// Register storage pub service if configured.
279+
// Advertise and serve p2p protocols.
280+
commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
281+
commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
282+
if checkInterval != checkpoint.CheckIntervalDisabled {
283+
commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
284+
}
280285
if rpcRoleProvider != nil {
281286
commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
282287
}
283288

289+
// Create p2p protocol clients.
290+
n.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())
291+
n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())
292+
n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())
293+
284294
return n, nil
285295
}
286296

@@ -430,13 +440,29 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) {
430440
ctx, cancel := context.WithCancel(n.ctx)
431441
defer cancel()
432442

433-
rsp, pf, err := n.storageSync.GetDiff(ctx, &storageSync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot})
443+
wl, pf, err := n.getDiff(ctx, prevRoot, thisRoot)
434444
if err != nil {
435445
result.err = err
436446
return
437447
}
438448
result.pf = pf
439-
result.writeLog = rsp.WriteLog
449+
result.writeLog = wl
450+
}
451+
452+
// getDiff fetches writelog using diff sync p2p protocol client.
453+
//
454+
// In case of no peers or error, it fallbacks to the legacy storage sync protocol.
455+
func (n *Node) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) {
456+
rsp1, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot})
457+
if err == nil { // if NO error
458+
return rsp1.WriteLog, pf, nil
459+
}
460+
461+
rsp2, pf, err := n.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot})
462+
if err != nil {
463+
return nil, nil, err
464+
}
465+
return rsp2.WriteLog, pf, nil
440466
}
441467

442468
func (n *Node) finalize(summary *blockSummary) {

go/worker/storage/p2p/sync/client.go renamed to go/worker/storage/p2p/synclegacy/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package sync
1+
package synclegacy
22

33
import (
44
"context"

go/worker/storage/p2p/sync/protocol.go renamed to go/worker/storage/p2p/synclegacy/protocol.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
package sync
1+
// Package synclegacy defines wire protocol together with client/server
2+
// implementations for the legacy storage sync protocol, used for runtime block sync.
3+
//
4+
// The protocol was split into storage diff and checkpoints protocol.
5+
//
6+
// TODO: Remove it: https://github.com/oasisprotocol/oasis-core/issues/6261
7+
package synclegacy
28

39
import (
410
"time"

go/worker/storage/p2p/sync/server.go renamed to go/worker/storage/p2p/synclegacy/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package sync
1+
package synclegacy
22

33
import (
44
"bytes"

0 commit comments

Comments
 (0)