diff --git a/.changelog/6262.feature.md b/.changelog/6262.feature.md new file mode 100644 index 00000000000..766413d7ce9 --- /dev/null +++ b/.changelog/6262.feature.md @@ -0,0 +1,12 @@ +go: Split storage sync p2p protocol + +Storage sync protocol was split into two independent protocols (checkpoint +and diff sync). + +This change was made since there may be fewer nodes that expose checkpoints +than storage diff. Previously, this could lead to issues with state sync +when a node was connected with peers that supported storage sync protocol +but had no checkpoints available. + +This was done in backwards compatible manner, so that both protocols are still +advertised and used. Eventually, we plan to remove legacy protocol. diff --git a/go/oasis-node/cmd/debug/byzantine/node.go b/go/oasis-node/cmd/debug/byzantine/node.go index 447def2d766..f1a69dfa543 100644 --- a/go/oasis-node/cmd/debug/byzantine/node.go +++ b/go/oasis-node/cmd/debug/byzantine/node.go @@ -22,7 +22,7 @@ import ( scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/worker/client" - storageP2P "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + storageP2P "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" ) type byzantine struct { diff --git a/go/p2p/p2p.go b/go/p2p/p2p.go index c74b243989f..b47980b0601 100644 --- a/go/p2p/p2p.go +++ b/go/p2p/p2p.go @@ -83,6 +83,8 @@ type p2p struct { registerAddresses []multiaddr.Multiaddr topics map[string]*topicHandler + protocolRegistry *protocol.Registry + logger *logging.Logger } @@ -281,7 +283,7 @@ func (p *p2p) Publish(_ context.Context, topic string, msg any) { // Implements api.Service. func (p *p2p) RegisterHandler(topic string, handler api.Handler) { - protocol.ValidateTopicID(topic) + p.protocolRegistry.ValidateTopicID(topic) p.Lock() defer p.Unlock() @@ -337,7 +339,7 @@ func (p *p2p) PeerManager() api.PeerManager { // Implements api.Service. func (p *p2p) RegisterProtocolServer(srv rpc.Server) { - protocol.ValidateProtocolID(srv.Protocol()) + p.protocolRegistry.ValidateProtocolID(srv.Protocol()) p.host.SetStreamHandler(srv.Protocol(), srv.HandleStream) @@ -439,6 +441,7 @@ func New(identity *identity.Identity, chainContext string, store *persistent.Com pubsub: pubsub, registerAddresses: cfg.Addresses, topics: make(map[string]*topicHandler), + protocolRegistry: protocol.NewRegistry(), logger: logger, }, nil } diff --git a/go/p2p/protocol/protocol.go b/go/p2p/protocol/protocol.go index 9504d006d30..835a24226ad 100644 --- a/go/p2p/protocol/protocol.go +++ b/go/p2p/protocol/protocol.go @@ -12,33 +12,32 @@ import ( "github.com/oasisprotocol/oasis-core/go/p2p/api" ) -type protocolRegistry struct { +// Registry is responsible for ensuring unique protocol ids. +type Registry struct { mu sync.Mutex protocols map[core.ProtocolID]struct{} } -func newProtocolRegistry() *protocolRegistry { - return &protocolRegistry{ +func NewRegistry() *Registry { + return &Registry{ protocols: make(map[core.ProtocolID]struct{}), } } -var registry = newProtocolRegistry() - // ValidateProtocolID panics if the protocol id is not unique. -func ValidateProtocolID(p core.ProtocolID) { - registry.mu.Lock() - defer registry.mu.Unlock() +func (r *Registry) ValidateProtocolID(p core.ProtocolID) { + r.mu.Lock() + defer r.mu.Unlock() - if _, ok := registry.protocols[p]; ok { + if _, ok := r.protocols[p]; ok { panic(fmt.Sprintf("p2p/protocol: protocol or topic with name '%s' already exists", p)) } - registry.protocols[p] = struct{}{} + r.protocols[p] = struct{}{} } // ValidateTopicID panics if the topic id is not unique. -func ValidateTopicID(topic string) { - ValidateProtocolID(core.ProtocolID(topic)) +func (r *Registry) ValidateTopicID(topic string) { + r.ValidateProtocolID(core.ProtocolID(topic)) } // NewProtocolID generates a protocol identifier for a consensus P2P protocol. diff --git a/go/p2p/protocol/protocol_test.go b/go/p2p/protocol/protocol_test.go index 73c0a218934..470c663f537 100644 --- a/go/p2p/protocol/protocol_test.go +++ b/go/p2p/protocol/protocol_test.go @@ -56,11 +56,10 @@ func TestProtocolID(t *testing.T) { require.Equal(expected, NewTopicIDForRuntime(chainContext, runtimeID, kind, version)) }) - registry = newProtocolRegistry() - t.Run("ValidateProtocolID", func(_ *testing.T) { - ValidateProtocolID("protocol-1") - ValidateProtocolID("protocol-2") + r := NewRegistry() + r.ValidateProtocolID("protocol-1") + r.ValidateProtocolID("protocol-2") }) t.Run("ValidateProtocolID panics", func(t *testing.T) { @@ -69,9 +68,8 @@ func TestProtocolID(t *testing.T) { t.Errorf("validate protocol id should fail") } }() - ValidateProtocolID("protocol") - ValidateProtocolID("protocol") + r := NewRegistry() + r.ValidateProtocolID("protocol") + r.ValidateProtocolID("protocol") }) - - registry = newProtocolRegistry() } diff --git a/go/p2p/rpc/client.go b/go/p2p/rpc/client.go index 888045ad471..6ad8869f625 100644 --- a/go/p2p/rpc/client.go +++ b/go/p2p/rpc/client.go @@ -269,8 +269,8 @@ type Client interface { } type client struct { - host core.Host - protocolID protocol.ID + host core.Host + protocolIDs []protocol.ID listeners struct { sync.RWMutex @@ -486,7 +486,7 @@ func (c *client) call( stream, err := c.host.NewStream( ctx, peerID, - c.protocolID, + c.protocolIDs..., ) if err != nil { return fmt.Errorf("failed to open stream: %w", err) @@ -612,15 +612,15 @@ func retryFn(ctx context.Context, fn func() error, maxRetries uint64, retryInter } // NewClient creates a new RPC client for the given protocol. -func NewClient(h host.Host, p protocol.ID) Client { +func NewClient(h host.Host, p protocol.ID, fallback ...protocol.ID) Client { if h == nil { // No P2P service, use the no-op client. return &nopClient{} } return &client{ - host: h, - protocolID: p, + host: h, + protocolIDs: append([]protocol.ID{p}, fallback...), listeners: struct { sync.RWMutex m map[ClientListener]struct{} diff --git a/go/storage/mkvs/checkpoint/checkpoint.go b/go/storage/mkvs/checkpoint/checkpoint.go index 15e807e6c75..a4375c8b983 100644 --- a/go/storage/mkvs/checkpoint/checkpoint.go +++ b/go/storage/mkvs/checkpoint/checkpoint.go @@ -48,7 +48,12 @@ type ChunkProvider interface { // GetCheckpointsRequest is a GetCheckpoints request. type GetCheckpointsRequest struct { - Version uint16 `json:"version"` + // Version is version of the checkpoint request. + Version uint16 `json:"version"` + // Namespace is the namespace the checkpoints are for. + // + // Existing server implementation may ignore validation of this field. + // Best practice is to still pass it, but not rely on the actual validation. Namespace common.Namespace `json:"namespace"` // RootVersion specifies an optional root version to limit the request to. If specified, only diff --git a/go/worker/storage/committee/checkpoint_sync.go b/go/worker/storage/committee/checkpoint_sync.go index c9236f4cada..e4a3d0e47c4 100644 --- a/go/worker/storage/committee/checkpoint_sync.go +++ b/go/worker/storage/committee/checkpoint_sync.go @@ -13,7 +13,7 @@ import ( storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" - storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" ) const ( @@ -55,7 +55,7 @@ type chunk struct { *checkpoint.ChunkMetadata // checkpoint points to the checkpoint this chunk originated from. - checkpoint *storageSync.Checkpoint + checkpoint *checkpointsync.Checkpoint } type chunkHeap struct { @@ -101,7 +101,7 @@ func (n *Node) checkpointChunkFetcher( defer cancel() // Fetch chunk from peers. - rsp, pf, err := n.storageSync.GetCheckpointChunk(chunkCtx, &storageSync.GetCheckpointChunkRequest{ + rsp, pf, err := n.checkpointSync.GetCheckpointChunk(chunkCtx, &checkpointsync.GetCheckpointChunkRequest{ Version: chunk.Version, Root: chunk.Root, Index: chunk.Index, @@ -157,7 +157,7 @@ func (n *Node) checkpointChunkFetcher( } } -func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { +func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil { // Any previous restores were already aborted by the driver up the call stack, so // things should have been going smoothly here; bail. @@ -276,11 +276,11 @@ func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelReques } } -func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) { +func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout) defer cancel() - list, err := n.storageSync.GetCheckpoints(ctx, &storageSync.GetCheckpointsRequest{ + list, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ Version: 1, }) if err != nil { @@ -297,8 +297,8 @@ func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) { } // sortCheckpoints sorts the slice in-place (descending by version, peers, hash). -func sortCheckpoints(s []*storageSync.Checkpoint) { - slices.SortFunc(s, func(a, b *storageSync.Checkpoint) int { +func sortCheckpoints(s []*checkpointsync.Checkpoint) { + slices.SortFunc(s, func(a, b *checkpointsync.Checkpoint) int { return cmp.Or( cmp.Compare(b.Root.Version, a.Root.Version), cmp.Compare(len(b.Peers), len(a.Peers)), @@ -307,7 +307,7 @@ func sortCheckpoints(s []*storageSync.Checkpoint) { }) } -func (n *Node) checkCheckpointUsable(cp *storageSync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { +func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { namespace := n.commonNode.Runtime.ID() if !namespace.Equal(&cp.Root.Namespace) { // Not for the right runtime. @@ -357,7 +357,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc // If we only want the genesis checkpoint, filter it out. if wantOnlyGenesis && len(cps) > 0 { - var filteredCps []*storageSync.Checkpoint + var filteredCps []*checkpointsync.Checkpoint for _, cp := range cps { if cp.Root.Version == genesisRound { filteredCps = append(filteredCps, cp) diff --git a/go/worker/storage/committee/checkpoint_sync_test.go b/go/worker/storage/committee/checkpoint_sync_test.go index 2e0ff2c206d..d39e50f3239 100644 --- a/go/worker/storage/committee/checkpoint_sync_test.go +++ b/go/worker/storage/committee/checkpoint_sync_test.go @@ -8,11 +8,11 @@ import ( "github.com/oasisprotocol/oasis-core/go/p2p/rpc" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/node" - "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" ) func TestSortCheckpoints(t *testing.T) { - cp1 := &sync.Checkpoint{ + cp1 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 2, @@ -20,7 +20,7 @@ func TestSortCheckpoints(t *testing.T) { }, Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()}, } - cp2 := &sync.Checkpoint{ + cp2 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 2, @@ -28,7 +28,7 @@ func TestSortCheckpoints(t *testing.T) { }, Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()}, } - cp3 := &sync.Checkpoint{ + cp3 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 1, @@ -36,7 +36,7 @@ func TestSortCheckpoints(t *testing.T) { }, Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()}, } - cp4 := &sync.Checkpoint{ + cp4 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 1, @@ -45,9 +45,9 @@ func TestSortCheckpoints(t *testing.T) { Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()}, } - s := []*sync.Checkpoint{cp2, cp3, cp4, cp1} + s := []*checkpointsync.Checkpoint{cp2, cp3, cp4, cp1} sortCheckpoints(s) - assert.Equal(t, s, []*sync.Checkpoint{cp1, cp2, cp3, cp4}) + assert.Equal(t, s, []*checkpointsync.Checkpoint{cp1, cp2, cp3, cp4}) } diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index 9832cb98648..29329db45aa 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -32,8 +32,10 @@ import ( "github.com/oasisprotocol/oasis-core/go/worker/common/committee" "github.com/oasisprotocol/oasis-core/go/worker/registration" "github.com/oasisprotocol/oasis-core/go/worker/storage/api" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync" storagePub "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub" - storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" ) var ( @@ -128,7 +130,8 @@ type Node struct { // nolint: maligned localStorage storageApi.LocalBackend - storageSync storageSync.Client + diffSync diffsync.Client + checkpointSync checkpointsync.Client undefinedRound uint64 @@ -272,9 +275,19 @@ func NewNode( node: n, }) - // Register storage sync service. - commonNode.P2P.RegisterProtocolServer(storageSync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - n.storageSync = storageSync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + // Advertise and serve legacy storage sync protocol. + commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + + // Register diff sync protocol server. + commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + // Register checkpoint sync protocol server if checkpoints are enabled. + if checkInterval < checkpoint.CheckIntervalDisabled { + commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + } + + // Create diff and checkpoint sync p2p protocol clients that have a fallback to the old legacy storage sync protocol. + n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) // Register storage pub service if configured. if rpcRoleProvider != nil { @@ -430,7 +443,7 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { ctx, cancel := context.WithCancel(n.ctx) defer cancel() - rsp, pf, err := n.storageSync.GetDiff(ctx, &storageSync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + rsp, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) if err != nil { result.err = err return diff --git a/go/worker/storage/p2p/checkpointsync/client.go b/go/worker/storage/p2p/checkpointsync/client.go new file mode 100644 index 00000000000..c662bb742c9 --- /dev/null +++ b/go/worker/storage/p2p/checkpointsync/client.go @@ -0,0 +1,151 @@ +package checkpointsync + +import ( + "context" + + "github.com/libp2p/go-libp2p/core" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/p2p/protocol" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" +) + +const ( + // minProtocolPeers is the minimum number of peers from the registry we want to have connected + // for checkpoint sync protocol. + minProtocolPeers = 5 + + // totalProtocolPeers is the number of peers we want to have connected for checkpoint sync protocol. + totalProtocolPeers = 10 +) + +// Client is a checkpoint sync protocol client. +type Client interface { + // GetCheckpoints returns a list of checkpoint metadata for all known checkpoints. + GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error) + + // GetCheckpointChunk requests a specific checkpoint chunk. + GetCheckpointChunk( + ctx context.Context, + request *GetCheckpointChunkRequest, + cp *Checkpoint, + ) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error) + + // IsReady is true when protocol client is aware of at least one remote peer. + IsReady() bool +} + +// Checkpoint contains checkpoint metadata together with peer information. +type Checkpoint struct { + *checkpoint.Metadata + + // Peers are the feedback structures of all the peers that have advertised this checkpoint. + Peers []rpc.PeerFeedback +} + +type client struct { + rc rpc.Client + mgr rpc.PeerManager + fallbackMgr rpc.PeerManager +} + +func (c *client) GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error) { + var rsp GetCheckpointsResponse + rsps, pfs, err := c.rc.CallMulti(ctx, c.getBestPeers(), MethodGetCheckpoints, request, rsp) + if err != nil { + return nil, err + } + + // Combine deduplicated results into a single result. + var checkpoints []*Checkpoint + cps := make(map[hash.Hash]*Checkpoint) + for i, peerRsp := range rsps { + peerCps := peerRsp.(*GetCheckpointsResponse).Checkpoints + + for _, cpMeta := range peerCps { + h := cpMeta.EncodedHash() + cp := cps[h] + if cp == nil { + cp = &Checkpoint{ + Metadata: cpMeta, + } + cps[h] = cp + checkpoints = append(checkpoints, cp) + } + cp.Peers = append(cp.Peers, pfs[i]) + } + + // Record success for a peer if it returned at least one checkpoint. + if len(peerCps) > 0 { + pfs[i].RecordSuccess() + } + } + return checkpoints, nil +} + +func (c *client) GetCheckpointChunk( + ctx context.Context, + request *GetCheckpointChunkRequest, + cp *Checkpoint, +) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error) { + var opts []rpc.BestPeersOption + // When a checkpoint is passed, we limit requests to only those peers that actually advertised + // having the checkpoint in question to avoid needless requests. + if cp != nil { + peers := make([]core.PeerID, 0, len(cp.Peers)) + for _, pf := range cp.Peers { + peers = append(peers, pf.PeerID()) + } + opts = append(opts, rpc.WithLimitPeers(peers)) + } + + var rsp GetCheckpointChunkResponse + peers := c.getBestPeers(opts...) + pf, err := c.rc.CallOne(ctx, peers, MethodGetCheckpointChunk, request, &rsp, + rpc.WithMaxPeerResponseTime(MaxGetCheckpointChunkResponseTime), + ) + if err != nil { + return nil, nil, err + } + return &rsp, pf, nil +} + +func (c *client) getBestPeers(opts ...rpc.BestPeersOption) []core.PeerID { + return append(c.mgr.GetBestPeers(opts...), c.fallbackMgr.GetBestPeers(opts...)...) +} + +func (c *client) IsReady() bool { + return len(c.getBestPeers()) > 0 +} + +// NewClient creates a new checkpoint sync protocol client. +// +// Previously, it was part of the storage sync protocol. To enable seamless rolling +// upgrades of the network, this client has a fallback to the old legacy protocol. +// The new protocol is prioritized. +// +// Finally, it ensures underlying p2p service starts tracking protocol peers +// for both new and legacy protocol. +func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { + pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) + fallbackPid := synclegacy.GetStorageSyncProtocolID(chainContext, runtimeID) + rc := rpc.NewClient(p2p.Host(), pid, fallbackPid) + mgr := rpc.NewPeerManager(p2p, pid) + rc.RegisterListener(mgr) + + // Fallback protocol requires a separate manager to manage peers that also support legacy protocol. + fallbackMgr := rpc.NewPeerManager(p2p, fallbackPid) + rc.RegisterListener(fallbackMgr) + + p2p.RegisterProtocol(pid, minProtocolPeers, totalProtocolPeers) + p2p.RegisterProtocol(fallbackPid, minProtocolPeers, totalProtocolPeers) + + return &client{ + rc: rc, + mgr: mgr, + fallbackMgr: fallbackMgr, + } +} diff --git a/go/worker/storage/p2p/checkpointsync/protocol.go b/go/worker/storage/p2p/checkpointsync/protocol.go new file mode 100644 index 00000000000..89192a8032e --- /dev/null +++ b/go/worker/storage/p2p/checkpointsync/protocol.go @@ -0,0 +1,80 @@ +// Package checkpointsync defines wire protocol together with client/server +// implementations for the checkpoint sync protocol, used for runtime state sync. +package checkpointsync + +import ( + "time" + + "github.com/libp2p/go-libp2p/core" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/common/node" + "github.com/oasisprotocol/oasis-core/go/common/version" + "github.com/oasisprotocol/oasis-core/go/p2p/peermgmt" + "github.com/oasisprotocol/oasis-core/go/p2p/protocol" + "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" +) + +// CheckpointSyncProtocolID is a unique protocol identifier for the checkpoint sync protocol. +const CheckpointSyncProtocolID = "checkpointsync" + +// CheckpointSyncProtocolVersion is the supported version of the checkpoint sync protocol. +var CheckpointSyncProtocolVersion = version.Version{Major: 1, Minor: 0, Patch: 0} + +// ProtocolID returns the runtime checkpoint sync protocol ID. +func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { + return protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) +} + +// Constants related to the GetCheckpoints method. +const ( + MethodGetCheckpoints = "GetCheckpoints" +) + +// GetCheckpointsRequest is a GetCheckpoints request. +type GetCheckpointsRequest struct { + Version uint16 `json:"version"` +} + +// GetCheckpointsResponse is a response to a GetCheckpoints request. +type GetCheckpointsResponse struct { + Checkpoints []*checkpoint.Metadata `json:"checkpoints,omitempty"` +} + +// Constants related to the GetCheckpointChunk method. +const ( + MethodGetCheckpointChunk = "GetCheckpointChunk" + MaxGetCheckpointChunkResponseTime = 1 * time.Minute +) + +// GetCheckpointChunkRequest is a GetCheckpointChunk request. +type GetCheckpointChunkRequest struct { + Version uint16 `json:"version"` + Root api.Root `json:"root"` + Index uint64 `json:"index"` + Digest hash.Hash `json:"digest"` +} + +// GetCheckpointChunkResponse is a response to a GetCheckpointChunk request. +type GetCheckpointChunkResponse struct { + Chunk []byte `json:"chunk,omitempty"` +} + +func init() { + peermgmt.RegisterNodeHandler(&peermgmt.NodeHandlerBundle{ + ProtocolsFn: func(n *node.Node, chainContext string) []core.ProtocolID { + if !n.HasRoles(node.RoleComputeWorker | node.RoleStorageRPC) { + return []core.ProtocolID{} + } + + protocols := make([]core.ProtocolID, len(n.Runtimes)) + for i, rt := range n.Runtimes { + protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) + } + + return protocols + }, + }) +} diff --git a/go/worker/storage/p2p/checkpointsync/server.go b/go/worker/storage/p2p/checkpointsync/server.go new file mode 100644 index 00000000000..d049dfee0f1 --- /dev/null +++ b/go/worker/storage/p2p/checkpointsync/server.go @@ -0,0 +1,73 @@ +package checkpointsync + +import ( + "bytes" + "context" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/cbor" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" + "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" +) + +type service struct { + backend api.Backend +} + +func (s *service) HandleRequest(ctx context.Context, method string, body cbor.RawMessage) (any, error) { + switch method { + case MethodGetCheckpoints: + var rq GetCheckpointsRequest + if err := cbor.Unmarshal(body, &rq); err != nil { + return nil, rpc.ErrBadRequest + } + + return s.handleGetCheckpoints(ctx, &rq) + case MethodGetCheckpointChunk: + var rq GetCheckpointChunkRequest + if err := cbor.Unmarshal(body, &rq); err != nil { + return nil, rpc.ErrBadRequest + } + + return s.handleGetCheckpointChunk(ctx, &rq) + default: + return nil, rpc.ErrMethodNotSupported + } +} + +func (s *service) handleGetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) (*GetCheckpointsResponse, error) { + cps, err := s.backend.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{ + Version: request.Version, + }) + if err != nil { + return nil, err + } + + return &GetCheckpointsResponse{ + Checkpoints: cps, + }, nil +} + +func (s *service) handleGetCheckpointChunk(ctx context.Context, request *GetCheckpointChunkRequest) (*GetCheckpointChunkResponse, error) { + // TODO: Use stream resource manager to track buffer use. + var buf bytes.Buffer + err := s.backend.GetCheckpointChunk(ctx, &checkpoint.ChunkMetadata{ + Version: request.Version, + Root: request.Root, + Index: request.Index, + Digest: request.Digest, + }, &buf) + if err != nil { + return nil, err + } + + return &GetCheckpointChunkResponse{ + Chunk: buf.Bytes(), + }, nil +} + +// NewServer creates a new checkpoints protocol server. +func NewServer(chainContext string, runtimeID common.Namespace, backend api.Backend) rpc.Server { + return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend}) +} diff --git a/go/worker/storage/p2p/diffsync/client.go b/go/worker/storage/p2p/diffsync/client.go new file mode 100644 index 00000000000..75be29d30c8 --- /dev/null +++ b/go/worker/storage/p2p/diffsync/client.go @@ -0,0 +1,85 @@ +package diffsync + +import ( + "context" + + "github.com/libp2p/go-libp2p/core" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/p2p/protocol" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" +) + +const ( + // minProtocolPeers is the minimum number of peers from the registry we want to have connected + // for diff sync protocol. + minProtocolPeers = 5 + + // totalProtocolPeers is the number of peers we want to have connected for diff sync protocol. + totalProtocolPeers = 10 +) + +// Client is a diff sync protocol client. +type Client interface { + // GetDiff requests a write log of entries that must be applied to get from the first given root + // to the second one. + GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) + + // IsReady is true when protocol client is aware of at least one remote peer. + IsReady() bool +} + +type client struct { + rc rpc.Client + mgr rpc.PeerManager + fallbackMgr rpc.PeerManager +} + +func (c *client) GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) { + var rsp GetDiffResponse + pf, err := c.rc.CallOne(ctx, c.getBestPeers(), MethodGetDiff, request, &rsp, + rpc.WithMaxPeerResponseTime(MaxGetDiffResponseTime), + ) + if err != nil { + return nil, nil, err + } + return &rsp, pf, nil +} + +func (c *client) getBestPeers(opts ...rpc.BestPeersOption) []core.PeerID { + return append(c.mgr.GetBestPeers(opts...), c.fallbackMgr.GetBestPeers(opts...)...) +} + +func (c *client) IsReady() bool { + return len(c.getBestPeers()) > 0 +} + +// NewClient creates a new diff sync protocol client. +// +// Previously, it was part of the storage sync protocol. To enable seamless rolling +// upgrades of the network, this client has a fallback to the old legacy protocol. +// The new protocol is prioritized. +// +// Finally, it ensures underlying p2p service starts tracking protocol peers +// for both new and legacy protocol. +func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { + pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, DiffSyncProtocolID, DiffSyncProtocolVersion) + fallbackPid := synclegacy.GetStorageSyncProtocolID(chainContext, runtimeID) + rc := rpc.NewClient(p2p.Host(), pid, fallbackPid) + mgr := rpc.NewPeerManager(p2p, pid) + rc.RegisterListener(mgr) + + // Fallback protocol requires a separate manager to manage peers that also support legacy protocol. + fallbackMgr := rpc.NewPeerManager(p2p, fallbackPid) + rc.RegisterListener(fallbackMgr) + + p2p.RegisterProtocol(pid, minProtocolPeers, totalProtocolPeers) + p2p.RegisterProtocol(fallbackPid, minProtocolPeers, totalProtocolPeers) + + return &client{ + rc: rc, + mgr: mgr, + fallbackMgr: fallbackMgr, + } +} diff --git a/go/worker/storage/p2p/diffsync/protocol.go b/go/worker/storage/p2p/diffsync/protocol.go new file mode 100644 index 00000000000..c53304d5666 --- /dev/null +++ b/go/worker/storage/p2p/diffsync/protocol.go @@ -0,0 +1,61 @@ +// Package diffsync defines wire protocol together with client/server +// implementations for the diff sync protocol, used for runtime block sync. +package diffsync + +import ( + "time" + + "github.com/libp2p/go-libp2p/core" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/node" + "github.com/oasisprotocol/oasis-core/go/common/version" + "github.com/oasisprotocol/oasis-core/go/p2p/peermgmt" + "github.com/oasisprotocol/oasis-core/go/p2p/protocol" + "github.com/oasisprotocol/oasis-core/go/storage/api" +) + +// DiffSyncProtocolID is a unique protocol identifier for the diff sync protocol. +const DiffSyncProtocolID = "diffsync" + +// DiffSyncProtocolVersion is the supported version of the diff sync protocol. +var DiffSyncProtocolVersion = version.Version{Major: 1, Minor: 0, Patch: 0} + +// ProtocolID returns the runtime diff sync protocol ID. +func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { + return protocol.NewRuntimeProtocolID(chainContext, runtimeID, DiffSyncProtocolID, DiffSyncProtocolVersion) +} + +// Constants related to the GetDiff method. +const ( + MethodGetDiff = "GetDiff" + MaxGetDiffResponseTime = 15 * time.Second +) + +// GetDiffRequest is a GetDiff request. +type GetDiffRequest struct { + StartRoot api.Root `json:"start_root"` + EndRoot api.Root `json:"end_root"` +} + +// GetDiffResponse is a response to a GetDiff request. +type GetDiffResponse struct { + WriteLog api.WriteLog `json:"write_log,omitempty"` +} + +func init() { + peermgmt.RegisterNodeHandler(&peermgmt.NodeHandlerBundle{ + ProtocolsFn: func(n *node.Node, chainContext string) []core.ProtocolID { + if !n.HasRoles(node.RoleComputeWorker | node.RoleStorageRPC) { + return []core.ProtocolID{} + } + + protocols := make([]core.ProtocolID, len(n.Runtimes)) + for i, rt := range n.Runtimes { + protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, DiffSyncProtocolID, DiffSyncProtocolVersion) + } + + return protocols + }, + }) +} diff --git a/go/worker/storage/p2p/diffsync/server.go b/go/worker/storage/p2p/diffsync/server.go new file mode 100644 index 00000000000..8520dea1a95 --- /dev/null +++ b/go/worker/storage/p2p/diffsync/server.go @@ -0,0 +1,61 @@ +package diffsync + +import ( + "context" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/cbor" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" + "github.com/oasisprotocol/oasis-core/go/storage/api" +) + +type service struct { + backend api.Backend +} + +func (s *service) HandleRequest(ctx context.Context, method string, body cbor.RawMessage) (any, error) { + switch method { + case MethodGetDiff: + var rq GetDiffRequest + if err := cbor.Unmarshal(body, &rq); err != nil { + return nil, rpc.ErrBadRequest + } + + return s.handleGetDiff(ctx, &rq) + default: + return nil, rpc.ErrMethodNotSupported + } +} + +func (s *service) handleGetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, error) { + it, err := s.backend.GetDiff(ctx, &api.GetDiffRequest{ + StartRoot: request.StartRoot, + EndRoot: request.EndRoot, + }) + if err != nil { + return nil, err + } + + var rsp GetDiffResponse + for { + more, err := it.Next() + if err != nil { + return nil, err + } + if !more { + break + } + + chunk, err := it.Value() + if err != nil { + return nil, err + } + rsp.WriteLog = append(rsp.WriteLog, chunk) + } + return &rsp, nil +} + +// NewServer creates a new storage diff protocol server. +func NewServer(chainContext string, runtimeID common.Namespace, backend api.Backend) rpc.Server { + return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend}) +} diff --git a/go/worker/storage/p2p/sync_test/interoperable_test.go b/go/worker/storage/p2p/sync_test/interoperable_test.go new file mode 100644 index 00000000000..0f5a7a92af4 --- /dev/null +++ b/go/worker/storage/p2p/sync_test/interoperable_test.go @@ -0,0 +1,418 @@ +package sync_test + +import ( + "context" + "fmt" + "io" + "net" + "os" + "path" + "strconv" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/common/crypto/signature/signers/memory" + "github.com/oasisprotocol/oasis-core/go/common/identity" + "github.com/oasisprotocol/oasis-core/go/common/persistent" + "github.com/oasisprotocol/oasis-core/go/config" + "github.com/oasisprotocol/oasis-core/go/p2p" + p2pApi "github.com/oasisprotocol/oasis-core/go/p2p/api" + "github.com/oasisprotocol/oasis-core/go/storage/api" + storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/node" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/syncer" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/writelog" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" +) + +var ( + chainContext = "test_chain_context" + runtimeID = common.NewTestNamespaceFromSeed([]byte("test namespace"), 0) +) + +// TestStorageSync test interoperability of storage sync P2P protocols. +// +// For context storage sync protocol was split into two protocols that are +// semantically equivalent to the respective subset of the legacy protocol. +// +// This test checks for backward and forward compatibility between legacy +// and new clients, with respect to legacy and new protocol servers. +func TestStorageSync(t *testing.T) { + require := require.New(t) + + dataDir, err := os.MkdirTemp("", "oasis-worker-storage-p2p-sync_test") + require.NoError(err, "Failed to create a temporary directory") + defer os.RemoveAll(dataDir) + + tests := []struct { + name string + legacyHost bool + peerKind peerKind + err error + }{ + { + name: "Legacy host client with legacy peer", + legacyHost: true, + peerKind: legacy, + }, + { + name: "Legacy host client with new peer (all protocols)", + legacyHost: true, + peerKind: all, + }, + { + name: "New host client with legacy peer", + legacyHost: false, + peerKind: legacy, + }, + { + name: "New host client with new peer (all protocols)", + legacyHost: false, + peerKind: all, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + test(t, dataDir, tc.legacyHost, tc.peerKind) + }) + } +} + +func test(t *testing.T, dataDir string, legacyHost bool, peerKind peerKind) { + backend := &backendMock{} + + peer1, clean1 := mustStartNewPeer(t, dataDir, 1, backend, none) + defer clean1() + + peer2, clean2 := mustStartNewPeer(t, dataDir, 2, backend, peerKind) + defer clean2() + + err := peer1.Host().Connect(context.Background(), peer.AddrInfo{ + ID: peer2.Host().ID(), + Addrs: peer2.Host().Addrs(), + }) + require.NoError(t, err, "Connecting host to peer") + + ctx, cancel := context.WithTimeout(t.Context(), 1*time.Minute) + defer cancel() + + switch legacyHost { + case true: + testLegacyHostClient(ctx, t, peer1, backend) + default: + testNewClients(ctx, t, peer1, backend) + } +} + +func testLegacyHostClient(ctx context.Context, t *testing.T, host p2pApi.Service, backend storageApi.Backend) { + require := require.New(t) + + client := synclegacy.NewClient(host, chainContext, runtimeID) + err := waitReady(ctx, client.IsReady) + require.NoError(err, "Wait for p2p client readiness") + // Test diff part of the storagesync protocol. + rsp, _, err := client.GetDiff(ctx, &synclegacy.GetDiffRequest{}) + require.NoError(err, "Fetch storage diff from p2p") + + err = assertEqualGetDiffResponse(ctx, backend, rsp.WriteLog) + require.NoError(err, "Assert expected storage diff response") + + // Test checkpoints part of the storagesync protocol. + cps, err := client.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{ + Version: 1, + }) + require.NoError(err, "Fetch checkpoints from p2p") + want, err := backend.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{ + Version: 1, + Namespace: runtimeID, + }) + require.NoError(err, "Fetch expected storage diff from backend") + getMeta := func(cp *synclegacy.Checkpoint) *checkpoint.Metadata { return cp.Metadata } + err = assertEqualCheckpoints(cps, want, getMeta) + require.NoError(err, "Assert expected checkpoints response") +} + +func testNewClients(ctx context.Context, t *testing.T, host p2pApi.Service, backend storageApi.Backend) { + require := require.New(t) + + // Test diff sync protocol. + diffClient := diffsync.NewClient(host, chainContext, runtimeID) + err := waitReady(ctx, diffClient.IsReady) + require.NoError(err, "Wait for p2p client readiness") + + rsp2, _, err := diffClient.GetDiff(ctx, &diffsync.GetDiffRequest{}) + require.NoError(err, "Fetch storage diff from p2p") + err = assertEqualGetDiffResponse(ctx, backend, rsp2.WriteLog) + require.NoError(err, "Assert expected storage diff response") + + // Test checkpoint sync protocol. + cpsClient := checkpointsync.NewClient(host, chainContext, runtimeID) + err = waitReady(ctx, cpsClient.IsReady) + require.NoError(err, "Wait for p2p client readiness") + + cps, err := cpsClient.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ + Version: 1, + }) + require.NoError(err, "Fetch checkpoints from p2p") + want, err := backend.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{ + Version: 1, + Namespace: runtimeID, + }) + require.NoError(err, "Fetch expected storage diff from backend") + getMeta := func(cp *checkpointsync.Checkpoint) *checkpoint.Metadata { return cp.Metadata } + err = assertEqualCheckpoints(cps, want, getMeta) + require.NoError(err, "Assert expected checkpoints response") +} + +func assertEqualGetDiffResponse(ctx context.Context, backend storageApi.Backend, got storageApi.WriteLog) error { + diff, err := backend.GetDiff(ctx, &storageApi.GetDiffRequest{}) + if err != nil { + return fmt.Errorf("fetching expected storage diff from backend: %w", err) + } + + want := make(storageApi.WriteLog, 0) + for { + next, err := diff.Next() + if !next { + break + } + if err != nil { + return fmt.Errorf("writelog iterator next: %w", err) + } + val, err := diff.Value() + if err != nil { + return fmt.Errorf("writelog iterator value: %w", err) + } + want = append(want, val) + } + + if !want.Equal(got) { + return fmt.Errorf("writelog not equal") + } + return nil +} + +func assertEqualCheckpoints[C any](cps []C, want []*checkpoint.Metadata, getMeta func(C) *checkpoint.Metadata) error { + if len(cps) != len(want) { + return fmt.Errorf("slice size not equal: got %d, want %d", len(cps), len(want)) + } + for i, cp1 := range cps { + if err := assertEqualCheckpointMeta(getMeta(cp1), want[i]); err != nil { + return fmt.Errorf("checkpoints at index %d not equal: %w", i, err) + } + } + return nil +} + +func assertEqualCheckpointMeta(this, other *checkpoint.Metadata) error { + if this.Version != other.Version { + return fmt.Errorf("version not equal") + } + if this.Root.Equal(&other.Root) { + return fmt.Errorf("root not equal") + } + if len(this.Chunks) != len(other.Chunks) { + return fmt.Errorf("not equal number of chunks") + } + for i, x := range this.Chunks { + if !x.Equal(&other.Chunks[i]) { + return fmt.Errorf("chunk %d not equal", i) + } + } + return nil +} + +type peerKind int + +const ( + legacy peerKind = iota + all + none +) + +func mustStartNewPeer(t *testing.T, dataDir string, id int, backend storageApi.Backend, kind peerKind) (service p2pApi.Service, clean func()) { + var err error + var cleanups []func() + clean = func() { + for i := len(cleanups) - 1; i >= 0; i-- { + cleanups[i]() + } + } + + defer func() { + if err != nil { + clean() + } + }() + + require := require.New(t) + + dataDir = path.Join(dataDir, strconv.Itoa(id)) + err = os.Mkdir(dataDir, 0o700) + require.NoError(err, "Failed to create a temporary directory") + cleanups = append(cleanups, func() { os.RemoveAll(dataDir) }) + + identity, err := identity.LoadOrGenerate(dataDir, memory.NewFactory()) + require.NoError(err, "Failed to generate a new identity") + + store, err := persistent.NewCommonStore(dataDir) + require.NoError(err, "Failed to generate persistent common store") + cleanups = append(cleanups, func() { store.Close() }) + + port, err := getAvailablePort() + require.NoError(err) + // Avoid this pattern. Ideally p2p service should be refactored to not use + // global config. + config.GlobalConfig.P2P.Port = uint16(port) + + p2p, err := p2p.New(identity, chainContext, store) + require.NoError(err, "Failed to generate persistent common store") + err = p2p.Start() + require.NoError(err, "Failed to start P2P service") + cleanups = append(cleanups, func() { p2p.Stop() }) + + switch kind { + case legacy: + serverLegacy := synclegacy.NewServer(chainContext, runtimeID, backend) + p2p.RegisterProtocolServer(serverLegacy) + case all: + serverLegacy := synclegacy.NewServer(chainContext, runtimeID, backend) + p2p.RegisterProtocolServer(serverLegacy) + diff := diffsync.NewServer(chainContext, runtimeID, backend) + p2p.RegisterProtocolServer(diff) + checkpoints := checkpointsync.NewServer(chainContext, runtimeID, backend) + p2p.RegisterProtocolServer(checkpoints) + case none: + default: + panic("peer kind not supported") + } + + return p2p, clean +} + +// getAvailablePort is only safe for testing, since we risk race between closing +// and re-binding returned port. +func getAvailablePort() (int, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, err + } + defer l.Close() + addr := l.Addr().(*net.TCPAddr) + return addr.Port, nil +} + +func waitReady(ctx context.Context, isReady func() bool) error { + ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + + backoff := 20 * time.Millisecond + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if isReady() { + return nil + } + + time.Sleep(backoff) + backoff *= 2 + } +} + +type backendMock struct{} + +func (bm *backendMock) SyncGet(context.Context, *syncer.GetRequest) (*syncer.ProofResponse, error) { + panic("not supported") +} + +func (bm *backendMock) SyncGetPrefixes(context.Context, *syncer.GetPrefixesRequest) (*syncer.ProofResponse, error) { + panic("not supported") +} + +func (bm *backendMock) SyncIterate(context.Context, *syncer.IterateRequest) (*syncer.ProofResponse, error) { + panic("not supported") +} + +func hashFromBytes(data []byte) hash.Hash { + var hash hash.Hash + hash.FromBytes(data) + return hash +} + +var ( + cpVersion uint16 + root node.Root + hash1 hash.Hash = hashFromBytes([]byte("hash1")) + hash2 hash.Hash = hashFromBytes([]byte("hash2")) +) + +func (bm *backendMock) GetCheckpoints(_ context.Context, request *checkpoint.GetCheckpointsRequest) ([]*checkpoint.Metadata, error) { + cpVersion = request.Version + root = node.Root{ + Namespace: request.Namespace, + Version: 1, + Type: api.RootTypeState, + Hash: hashFromBytes([]byte("root has")), + } + cp := &checkpoint.Metadata{ + Version: cpVersion, + Root: root, + Chunks: []hash.Hash{hash1, hash2}, + } + + return []*checkpoint.Metadata{cp}, nil +} + +func (bm *backendMock) GetCheckpointChunk(_ context.Context, chunk *checkpoint.ChunkMetadata, w io.Writer) error { + if !chunk.Root.Equal(&root) || chunk.Version != cpVersion || chunk.Index > 1 { + return fmt.Errorf("invalid chunk metadata") + } + + switch chunk.Index { + case 0: + if !chunk.Digest.Equal(&hash1) { + return fmt.Errorf("invalid chunk metada") + } + if _, err := w.Write([]byte("hash1")); err != nil { + return err + } + case 1: + if !chunk.Digest.Equal(&hash2) { + return fmt.Errorf("invalid chunk metada") + } + if _, err := w.Write([]byte("hash2")); err != nil { + return err + } + } + + return nil +} + +func (bm *backendMock) GetDiff(_ context.Context, request *storageApi.GetDiffRequest) (storageApi.WriteLogIterator, error) { + items := writelog.WriteLog{ + writelog.LogEntry{Key: []byte("startHash"), Value: []byte(request.StartRoot.Hash.String())}, + writelog.LogEntry{Key: []byte("endHash"), Value: []byte(request.EndRoot.Hash.String())}, + } + return writelog.NewStaticIterator(items), nil +} + +func (bm *backendMock) Cleanup() { + panic("not supported") +} + +func (bm *backendMock) Initialized() <-chan struct{} { + panic("not supported") +} diff --git a/go/worker/storage/p2p/sync/client.go b/go/worker/storage/p2p/synclegacy/client.go similarity index 89% rename from go/worker/storage/p2p/sync/client.go rename to go/worker/storage/p2p/synclegacy/client.go index fdb8ed7379d..96f5be2ae3c 100644 --- a/go/worker/storage/p2p/sync/client.go +++ b/go/worker/storage/p2p/synclegacy/client.go @@ -1,4 +1,4 @@ -package sync +package synclegacy import ( "context" @@ -36,6 +36,9 @@ type Client interface { request *GetCheckpointChunkRequest, cp *Checkpoint, ) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error) + + // IsReady is true when protocol client is aware of at least one remote peer. + IsReady() bool } // Checkpoint contains checkpoint metadata together with peer information. @@ -124,6 +127,15 @@ func (c *client) GetCheckpointChunk( return &rsp, pf, nil } +// GetStorageSyncProtocolID returns unique storage sync protocol id for the specified chain context and runtime id. +func GetStorageSyncProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { + return protocol.NewRuntimeProtocolID(chainContext, runtimeID, StorageSyncProtocolID, StorageSyncProtocolVersion) +} + +func (c *client) IsReady() bool { + return len(c.mgrC.GetBestPeers()) > 0 +} + // NewClient creates a new storage sync protocol client. func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { // Use two separate clients and managers for the same protocol. This is to make sure that peers diff --git a/go/worker/storage/p2p/sync/protocol.go b/go/worker/storage/p2p/synclegacy/protocol.go similarity index 90% rename from go/worker/storage/p2p/sync/protocol.go rename to go/worker/storage/p2p/synclegacy/protocol.go index 7dca895e014..e32d764bca6 100644 --- a/go/worker/storage/p2p/sync/protocol.go +++ b/go/worker/storage/p2p/synclegacy/protocol.go @@ -1,4 +1,10 @@ -package sync +// Package synclegacy defines wire protocol together with client/server +// implementations for the legacy storage sync protocol, used for runtime block sync. +// +// The protocol was split into storage diff and checkpoints protocol. +// +// TODO: Remove it: https://github.com/oasisprotocol/oasis-core/issues/6261 +package synclegacy import ( "time" diff --git a/go/worker/storage/p2p/sync/server.go b/go/worker/storage/p2p/synclegacy/server.go similarity index 99% rename from go/worker/storage/p2p/sync/server.go rename to go/worker/storage/p2p/synclegacy/server.go index 22b6731465a..a09c342bcaf 100644 --- a/go/worker/storage/p2p/sync/server.go +++ b/go/worker/storage/p2p/synclegacy/server.go @@ -1,4 +1,4 @@ -package sync +package synclegacy import ( "bytes"