Skip to content

Commit fc405de

Browse files
committed
go/worker/storage/p2p: Split storage sync protocol
1 parent 4213d9a commit fc405de

6 files changed

Lines changed: 458 additions & 0 deletions

File tree

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package checkpointsync
2+
3+
import (
4+
"context"
5+
6+
"github.com/libp2p/go-libp2p/core"
7+
8+
"github.com/oasisprotocol/oasis-core/go/common"
9+
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
10+
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
11+
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
12+
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
13+
)
14+
15+
const (
16+
// minProtocolPeers is the minimum number of peers from the registry we want to have connected
17+
// for checkpoint sync protocol.
18+
minProtocolPeers = 5
19+
20+
// totalProtocolPeers is the number of peers we want to have connected for checkpoint sync protocol.
21+
totalProtocolPeers = 10
22+
)
23+
24+
// Client is a checkpoint sync protocol client.
25+
type Client interface {
26+
// GetCheckpoints returns a list of checkpoint metadata for all known checkpoints.
27+
GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error)
28+
29+
// GetCheckpointChunk requests a specific checkpoint chunk.
30+
GetCheckpointChunk(
31+
ctx context.Context,
32+
request *GetCheckpointChunkRequest,
33+
cp *Checkpoint,
34+
) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error)
35+
}
36+
37+
// Checkpoint contains checkpoint metadata together with peer information.
38+
type Checkpoint struct {
39+
*checkpoint.Metadata
40+
41+
// Peers are the feedback structures of all the peers that have advertised this checkpoint.
42+
Peers []rpc.PeerFeedback
43+
}
44+
45+
type client struct {
46+
rc rpc.Client
47+
mgr rpc.PeerManager
48+
}
49+
50+
func (c *client) GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error) {
51+
var rsp GetCheckpointsResponse
52+
rsps, pfs, err := c.rc.CallMulti(ctx, c.mgr.GetBestPeers(), MethodGetCheckpoints, request, rsp)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
// Combine deduplicated results into a single result.
58+
var checkpoints []*Checkpoint
59+
cps := make(map[hash.Hash]*Checkpoint)
60+
for i, peerRsp := range rsps {
61+
peerCps := peerRsp.(*GetCheckpointsResponse).Checkpoints
62+
63+
for _, cpMeta := range peerCps {
64+
h := cpMeta.EncodedHash()
65+
cp := cps[h]
66+
if cp == nil {
67+
cp = &Checkpoint{
68+
Metadata: cpMeta,
69+
}
70+
cps[h] = cp
71+
checkpoints = append(checkpoints, cp)
72+
}
73+
cp.Peers = append(cp.Peers, pfs[i])
74+
}
75+
76+
// Record success for a peer if it returned at least one checkpoint.
77+
if len(peerCps) > 0 {
78+
pfs[i].RecordSuccess()
79+
}
80+
}
81+
return checkpoints, nil
82+
}
83+
84+
func (c *client) GetCheckpointChunk(
85+
ctx context.Context,
86+
request *GetCheckpointChunkRequest,
87+
cp *Checkpoint,
88+
) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error) {
89+
var opts []rpc.BestPeersOption
90+
// When a checkpoint is passed, we limit requests to only those peers that actually advertised
91+
// having the checkpoint in question to avoid needless requests.
92+
if cp != nil {
93+
peers := make([]core.PeerID, 0, len(cp.Peers))
94+
for _, pf := range cp.Peers {
95+
peers = append(peers, pf.PeerID())
96+
}
97+
opts = append(opts, rpc.WithLimitPeers(peers))
98+
}
99+
100+
var rsp GetCheckpointChunkResponse
101+
pf, err := c.rc.CallOne(ctx, c.mgr.GetBestPeers(opts...), MethodGetCheckpointChunk, request, &rsp,
102+
rpc.WithMaxPeerResponseTime(MaxGetCheckpointChunkResponseTime),
103+
)
104+
if err != nil {
105+
return nil, nil, err
106+
}
107+
return &rsp, pf, nil
108+
}
109+
110+
// NewClient creates a new checkpoint sync protocol client.
111+
//
112+
// Moreover, it ensures underlying p2p service starts tracking protocol peers.
113+
func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client {
114+
pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion)
115+
rc := rpc.NewClient(p2p.Host(), pid)
116+
mgr := rpc.NewPeerManager(p2p, pid)
117+
rc.RegisterListener(mgr)
118+
119+
p2p.RegisterProtocol(pid, minProtocolPeers, totalProtocolPeers)
120+
121+
return &client{
122+
rc: rc,
123+
mgr: mgr,
124+
}
125+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Package checkpointsync defines wire protocol together with client/server
2+
// implementations for the checkpoint sync protocol, used for runtime state sync.
3+
package checkpointsync
4+
5+
import (
6+
"time"
7+
8+
"github.com/libp2p/go-libp2p/core"
9+
10+
"github.com/oasisprotocol/oasis-core/go/common"
11+
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
12+
"github.com/oasisprotocol/oasis-core/go/common/node"
13+
"github.com/oasisprotocol/oasis-core/go/common/version"
14+
"github.com/oasisprotocol/oasis-core/go/p2p/peermgmt"
15+
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
16+
"github.com/oasisprotocol/oasis-core/go/storage/api"
17+
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
18+
)
19+
20+
// CheckpointSyncProtocolID is a unique protocol identifier for the checkpoint sync protocol.
21+
const CheckpointSyncProtocolID = "checkpointsync"
22+
23+
// CheckpointSyncProtocolVersion is the supported version of the checkpoint sync protocol.
24+
var CheckpointSyncProtocolVersion = version.Version{Major: 1, Minor: 0, Patch: 0}
25+
26+
// ProtocolID returns the runtime checkpoint sync protocol ID.
27+
func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID {
28+
return protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion)
29+
}
30+
31+
// Constants related to the GetCheckpoints method.
32+
const (
33+
MethodGetCheckpoints = "GetCheckpoints"
34+
)
35+
36+
// GetCheckpointsRequest is a GetCheckpoints request.
37+
type GetCheckpointsRequest struct {
38+
Version uint16 `json:"version"`
39+
}
40+
41+
// GetCheckpointsResponse is a response to a GetCheckpoints request.
42+
type GetCheckpointsResponse struct {
43+
Checkpoints []*checkpoint.Metadata `json:"checkpoints,omitempty"`
44+
}
45+
46+
// Constants related to the GetCheckpointChunk method.
47+
const (
48+
MethodGetCheckpointChunk = "GetCheckpointChunk"
49+
MaxGetCheckpointChunkResponseTime = time.Minute
50+
)
51+
52+
// GetCheckpointChunkRequest is a GetCheckpointChunk request.
53+
type GetCheckpointChunkRequest struct {
54+
Version uint16 `json:"version"`
55+
Root api.Root `json:"root"`
56+
Index uint64 `json:"index"`
57+
Digest hash.Hash `json:"digest"`
58+
}
59+
60+
// GetCheckpointChunkResponse is a response to a GetCheckpointChunk request.
61+
type GetCheckpointChunkResponse struct {
62+
Chunk []byte `json:"chunk,omitempty"`
63+
}
64+
65+
func init() {
66+
peermgmt.RegisterNodeHandler(&peermgmt.NodeHandlerBundle{
67+
ProtocolsFn: func(n *node.Node, chainContext string) []core.ProtocolID {
68+
if !n.HasRoles(node.RoleComputeWorker | node.RoleStorageRPC) {
69+
return []core.ProtocolID{}
70+
}
71+
72+
protocols := make([]core.ProtocolID, len(n.Runtimes))
73+
for i, rt := range n.Runtimes {
74+
protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion)
75+
}
76+
77+
return protocols
78+
},
79+
})
80+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package checkpointsync
2+
3+
import (
4+
"bytes"
5+
"context"
6+
7+
"github.com/oasisprotocol/oasis-core/go/common"
8+
"github.com/oasisprotocol/oasis-core/go/common/cbor"
9+
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
10+
"github.com/oasisprotocol/oasis-core/go/storage/api"
11+
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
12+
)
13+
14+
type service struct {
15+
backend api.Backend
16+
}
17+
18+
func (s *service) HandleRequest(ctx context.Context, method string, body cbor.RawMessage) (any, error) {
19+
switch method {
20+
case MethodGetCheckpoints:
21+
var rq GetCheckpointsRequest
22+
if err := cbor.Unmarshal(body, &rq); err != nil {
23+
return nil, rpc.ErrBadRequest
24+
}
25+
26+
return s.handleGetCheckpoints(ctx, &rq)
27+
case MethodGetCheckpointChunk:
28+
var rq GetCheckpointChunkRequest
29+
if err := cbor.Unmarshal(body, &rq); err != nil {
30+
return nil, rpc.ErrBadRequest
31+
}
32+
33+
return s.handleGetCheckpointChunk(ctx, &rq)
34+
default:
35+
return nil, rpc.ErrMethodNotSupported
36+
}
37+
}
38+
39+
func (s *service) handleGetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) (*GetCheckpointsResponse, error) {
40+
cps, err := s.backend.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{
41+
Version: request.Version,
42+
})
43+
if err != nil {
44+
return nil, err
45+
}
46+
47+
return &GetCheckpointsResponse{
48+
Checkpoints: cps,
49+
}, nil
50+
}
51+
52+
func (s *service) handleGetCheckpointChunk(ctx context.Context, request *GetCheckpointChunkRequest) (*GetCheckpointChunkResponse, error) {
53+
// Consider using stream resource manager to track buffer use.
54+
var buf bytes.Buffer
55+
err := s.backend.GetCheckpointChunk(ctx, &checkpoint.ChunkMetadata{
56+
Version: request.Version,
57+
Root: request.Root,
58+
Index: request.Index,
59+
Digest: request.Digest,
60+
}, &buf)
61+
if err != nil {
62+
return nil, err
63+
}
64+
65+
return &GetCheckpointChunkResponse{
66+
Chunk: buf.Bytes(),
67+
}, nil
68+
}
69+
70+
// NewServer creates a new checkpoint sync protocol server.
71+
func NewServer(chainContext string, runtimeID common.Namespace, backend api.Backend) rpc.Server {
72+
return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend})
73+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package diffsync
2+
3+
import (
4+
"context"
5+
6+
"github.com/oasisprotocol/oasis-core/go/common"
7+
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
8+
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
9+
)
10+
11+
const (
12+
// minProtocolPeers is the minimum number of peers from the registry we want to have connected
13+
// for diff sync protocol.
14+
minProtocolPeers = 5
15+
16+
// totalProtocolPeers is the number of peers we want to have connected for diff sync protocol.
17+
totalProtocolPeers = 10
18+
)
19+
20+
// Client is a diff sync protocol client.
21+
type Client interface {
22+
// GetDiff requests a write log of entries that must be applied to get from the first given root
23+
// to the second one.
24+
GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error)
25+
}
26+
27+
type client struct {
28+
rc rpc.Client
29+
mgr rpc.PeerManager
30+
}
31+
32+
func (c *client) GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) {
33+
var rsp GetDiffResponse
34+
pf, err := c.rc.CallOne(ctx, c.mgr.GetBestPeers(), MethodGetDiff, request, &rsp,
35+
rpc.WithMaxPeerResponseTime(MaxGetDiffResponseTime),
36+
)
37+
if err != nil {
38+
return nil, nil, err
39+
}
40+
return &rsp, pf, nil
41+
}
42+
43+
// NewClient creates a new diff sync protocol client.
44+
//
45+
// Moreover, it ensures underlying p2p service starts tracking protocol peers.
46+
func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client {
47+
pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, DiffSyncProtocolID, DiffSyncProtocolVersion)
48+
rc := rpc.NewClient(p2p.Host(), pid)
49+
mgr := rpc.NewPeerManager(p2p, pid)
50+
rc.RegisterListener(mgr)
51+
52+
p2p.RegisterProtocol(pid, minProtocolPeers, totalProtocolPeers)
53+
54+
return &client{
55+
rc: rc,
56+
mgr: mgr,
57+
}
58+
}

0 commit comments

Comments
 (0)