Skip to content

Commit c11befc

Browse files
committed
fixup! feat(libp2p): add v1.0.0 network compatibility
1 parent e3eacb6 commit c11befc

4 files changed

Lines changed: 129 additions & 40 deletions

File tree

impl/graphsync_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,9 @@ var protocolsForTest = map[string]struct {
6262
host2Protocols []protocol.ID
6363
}{
6464
"(v1.1 -> v1.1)": {nil, nil},
65-
/*
66-
"(v1.0 -> v1.1)": {[]protocol.ID{network.ProtocolGraphsync_1_0_0}, nil},
67-
"(v1.1 -> v1.0)": {nil, []protocol.ID{network.ProtocolGraphsync_1_0_0}},
68-
"(v1.0 -> v1.0)": {[]protocol.ID{network.ProtocolGraphsync_1_0_0}, []protocol.ID{network.ProtocolGraphsync_1_0_0}},
69-
*/
65+
"(v1.0 -> v1.1)": {[]protocol.ID{network.ProtocolGraphsync_1_0_0}, nil},
66+
"(v1.1 -> v1.0)": {nil, []protocol.ID{network.ProtocolGraphsync_1_0_0}},
67+
"(v1.0 -> v1.0)": {[]protocol.ID{network.ProtocolGraphsync_1_0_0}, []protocol.ID{network.ProtocolGraphsync_1_0_0}},
7068
}
7169

7270
func TestMakeRequestToNetwork(t *testing.T) {

message/message.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package message
22

33
import (
4+
"bytes"
5+
"fmt"
46
"io"
7+
"strings"
58

69
blocks "github.com/ipfs/go-block-format"
710
cid "github.com/ipfs/go-cid"
811
"github.com/ipld/go-ipld-prime"
12+
"github.com/ipld/go-ipld-prime/codec/dagjson"
913

1014
"github.com/ipfs/go-graphsync"
1115
pb "github.com/ipfs/go-graphsync/message/pb"
@@ -50,6 +54,25 @@ type GraphSyncRequest struct {
5054
isUpdate bool
5155
}
5256

57+
// String returns a human-readable form of a GraphSyncRequest
58+
func (gsr GraphSyncRequest) String() string {
59+
var buf bytes.Buffer
60+
dagjson.Encode(gsr.selector, &buf)
61+
ext := make([]string, 0)
62+
for s, _ := range gsr.extensions {
63+
ext = append(ext, s)
64+
}
65+
return fmt.Sprintf("GraphSyncRequest<root=%s, selector=%s, priority=%d, id=%s, cancel=%v, update=%v, exts=%s>",
66+
gsr.root.String(),
67+
buf.String(),
68+
gsr.priority,
69+
gsr.id.String(),
70+
gsr.isCancel,
71+
gsr.isUpdate,
72+
strings.Join(ext, "|"),
73+
)
74+
}
75+
5376
// GraphSyncResponse is an struct to capture data on a response sent back
5477
// in a GraphSyncMessage.
5578
type GraphSyncResponse struct {
@@ -58,12 +81,43 @@ type GraphSyncResponse struct {
5881
extensions map[string][]byte
5982
}
6083

84+
// String returns a human-readable form of a GraphSyncResponse
85+
func (gsr GraphSyncResponse) String() string {
86+
ext := make([]string, 0)
87+
for s, _ := range gsr.extensions {
88+
ext = append(ext, s)
89+
}
90+
return fmt.Sprintf("GraphSyncResponse<id=%s, status=%d, exts=%s>",
91+
gsr.requestID.String(),
92+
gsr.status,
93+
strings.Join(ext, "|"),
94+
)
95+
}
96+
97+
// GraphSyncMessage is the internal representation form of a message sent or
98+
// received over the wire
6199
type GraphSyncMessage struct {
62100
requests map[graphsync.RequestID]GraphSyncRequest
63101
responses map[graphsync.RequestID]GraphSyncResponse
64102
blocks map[cid.Cid]blocks.Block
65103
}
66104

105+
// String returns a human-readable (multi-line) form of a GraphSyncMessage and
106+
// its contents
107+
func (gsm GraphSyncMessage) String() string {
108+
cts := make([]string, 0)
109+
for _, req := range gsm.requests {
110+
cts = append(cts, req.String())
111+
}
112+
for _, resp := range gsm.responses {
113+
cts = append(cts, resp.String())
114+
}
115+
for c := range gsm.blocks {
116+
cts = append(cts, fmt.Sprintf("Block<%s>", c.String()))
117+
}
118+
return fmt.Sprintf("GraphSyncMessage<\n\t%s\n>", strings.Join(cts, ",\n\t"))
119+
}
120+
67121
// NewRequest builds a new Graphsync request
68122
func NewRequest(id graphsync.RequestID,
69123
root cid.Cid,
@@ -128,10 +182,13 @@ func newResponse(requestID graphsync.RequestID,
128182
}
129183
}
130184

185+
// Empty returns true if this message has no actionable content
131186
func (gsm GraphSyncMessage) Empty() bool {
132187
return len(gsm.blocks) == 0 && len(gsm.requests) == 0 && len(gsm.responses) == 0
133188
}
134189

190+
// Requests returns a copy of the list of GraphSyncRequests in this
191+
// GraphSyncMessage
135192
func (gsm GraphSyncMessage) Requests() []GraphSyncRequest {
136193
requests := make([]GraphSyncRequest, 0, len(gsm.requests))
137194
for _, request := range gsm.requests {
@@ -140,6 +197,8 @@ func (gsm GraphSyncMessage) Requests() []GraphSyncRequest {
140197
return requests
141198
}
142199

200+
// ResponseCodes returns a list of ResponseStatusCodes contained in the
201+
// responses in this GraphSyncMessage
143202
func (gsm GraphSyncMessage) ResponseCodes() map[graphsync.RequestID]graphsync.ResponseStatusCode {
144203
codes := make(map[graphsync.RequestID]graphsync.ResponseStatusCode, len(gsm.responses))
145204
for id, response := range gsm.responses {
@@ -148,6 +207,8 @@ func (gsm GraphSyncMessage) ResponseCodes() map[graphsync.RequestID]graphsync.Re
148207
return codes
149208
}
150209

210+
// Responses returns a copy of the list of GraphSyncResponses in this
211+
// GraphSyncMessage
151212
func (gsm GraphSyncMessage) Responses() []GraphSyncResponse {
152213
responses := make([]GraphSyncResponse, 0, len(gsm.responses))
153214
for _, response := range gsm.responses {
@@ -156,6 +217,7 @@ func (gsm GraphSyncMessage) Responses() []GraphSyncResponse {
156217
return responses
157218
}
158219

220+
// Blocks returns a copy of the list of Blocks in this GraphSyncMessage
159221
func (gsm GraphSyncMessage) Blocks() []blocks.Block {
160222
bs := make([]blocks.Block, 0, len(gsm.blocks))
161223
for _, block := range gsm.blocks {
@@ -164,6 +226,7 @@ func (gsm GraphSyncMessage) Blocks() []blocks.Block {
164226
return bs
165227
}
166228

229+
// Loggable returns a simplified, single-line log form of this GraphSyncMessage
167230
func (gsm GraphSyncMessage) Loggable() map[string]interface{} {
168231
requests := make([]string, 0, len(gsm.requests))
169232
for _, request := range gsm.requests {
@@ -179,6 +242,7 @@ func (gsm GraphSyncMessage) Loggable() map[string]interface{} {
179242
}
180243
}
181244

245+
// Clone returns a shallow copy of this GraphSyncMessage
182246
func (gsm GraphSyncMessage) Clone() GraphSyncMessage {
183247
requests := make(map[graphsync.RequestID]GraphSyncRequest, len(gsm.requests))
184248
for id, request := range gsm.requests {

message/messagehandler.go

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/binary"
55
"errors"
66
"io"
7+
"sync"
78

89
blocks "github.com/ipfs/go-block-format"
910
"github.com/ipfs/go-cid"
@@ -13,31 +14,41 @@ import (
1314
"github.com/ipld/go-ipld-prime"
1415
pool "github.com/libp2p/go-buffer-pool"
1516
"github.com/libp2p/go-libp2p-core/network"
17+
"github.com/libp2p/go-libp2p-core/peer"
1618
"github.com/libp2p/go-msgio"
1719
"google.golang.org/protobuf/proto"
1820
)
1921

22+
type v1RequestKey struct {
23+
p peer.ID
24+
id int32
25+
}
26+
2027
type MessageHandler struct {
21-
fromV1Map map[int32]graphsync.RequestID
28+
mapLock sync.Mutex
29+
// each host can have multiple peerIDs, so our integer requestID mapping for
30+
// protocol v1.0.0 needs to be a combo of peerID and requestID
31+
fromV1Map map[v1RequestKey]graphsync.RequestID
2232
toV1Map map[graphsync.RequestID]int32
2333
nextIntId int32
2434
}
2535

36+
// NewMessageHandler instantiates a new MessageHandler instance
2637
func NewMessageHandler() *MessageHandler {
2738
return &MessageHandler{
28-
fromV1Map: make(map[int32]graphsync.RequestID),
39+
fromV1Map: make(map[v1RequestKey]graphsync.RequestID),
2940
toV1Map: make(map[graphsync.RequestID]int32),
3041
}
3142
}
3243

3344
// FromNet can read a network stream to deserialized a GraphSyncMessage
34-
func (mh MessageHandler) FromNet(r io.Reader) (GraphSyncMessage, error) {
45+
func (mh *MessageHandler) FromNet(r io.Reader) (GraphSyncMessage, error) {
3546
reader := msgio.NewVarintReaderSize(r, network.MessageSizeMax)
3647
return mh.FromMsgReader(reader)
3748
}
3849

3950
// FromMsgReader can deserialize a protobuf message into a GraphySyncMessage.
40-
func (mh MessageHandler) FromMsgReader(r msgio.Reader) (GraphSyncMessage, error) {
51+
func (mh *MessageHandler) FromMsgReader(r msgio.Reader) (GraphSyncMessage, error) {
4152
msg, err := r.ReadMsg()
4253
if err != nil {
4354
return GraphSyncMessage{}, err
@@ -54,7 +65,7 @@ func (mh MessageHandler) FromMsgReader(r msgio.Reader) (GraphSyncMessage, error)
5465
}
5566

5667
// FromMsgReaderV1 can deserialize a v1.0.0 protobuf message into a GraphySyncMessage.
57-
func (mh MessageHandler) FromMsgReaderV1(r msgio.Reader) (GraphSyncMessage, error) {
68+
func (mh *MessageHandler) FromMsgReaderV1(p peer.ID, r msgio.Reader) (GraphSyncMessage, error) {
5869
msg, err := r.ReadMsg()
5970
if err != nil {
6071
return GraphSyncMessage{}, err
@@ -67,9 +78,10 @@ func (mh MessageHandler) FromMsgReaderV1(r msgio.Reader) (GraphSyncMessage, erro
6778
return GraphSyncMessage{}, err
6879
}
6980

70-
return mh.newMessageFromProtoV1(&pb)
81+
return mh.newMessageFromProtoV1(p, &pb)
7182
}
7283

84+
// ToProto converts a GraphSyncMessage to its pb.Message equivalent
7385
func (mh *MessageHandler) ToProto(gsm GraphSyncMessage) (*pb.Message, error) {
7486
pbm := new(pb.Message)
7587
pbm.Requests = make([]*pb.Message_Request, 0, len(gsm.requests))
@@ -113,7 +125,11 @@ func (mh *MessageHandler) ToProto(gsm GraphSyncMessage) (*pb.Message, error) {
113125
return pbm, nil
114126
}
115127

116-
func (mh *MessageHandler) ToProtoV1(gsm GraphSyncMessage) (*pb.Message_V1_0_0, error) {
128+
// ToProtoV1 converts a GraphSyncMessage to its pb.Message_V1_0_0 equivalent
129+
func (mh *MessageHandler) ToProtoV1(p peer.ID, gsm GraphSyncMessage) (*pb.Message_V1_0_0, error) {
130+
mh.mapLock.Lock()
131+
defer mh.mapLock.Unlock()
132+
117133
pbm := new(pb.Message_V1_0_0)
118134
pbm.Requests = make([]*pb.Message_V1_0_0_Request, 0, len(gsm.requests))
119135
for _, request := range gsm.requests {
@@ -125,7 +141,7 @@ func (mh *MessageHandler) ToProtoV1(gsm GraphSyncMessage) (*pb.Message_V1_0_0, e
125141
return nil, err
126142
}
127143
}
128-
rid, err := mh.bytesIdToInt(request.id.Bytes())
144+
rid, err := bytesIdToInt(p, mh.fromV1Map, mh.toV1Map, &mh.nextIntId, request.id.Bytes())
129145
if err != nil {
130146
return nil, err
131147
}
@@ -142,7 +158,7 @@ func (mh *MessageHandler) ToProtoV1(gsm GraphSyncMessage) (*pb.Message_V1_0_0, e
142158

143159
pbm.Responses = make([]*pb.Message_V1_0_0_Response, 0, len(gsm.responses))
144160
for _, response := range gsm.responses {
145-
rid, err := mh.bytesIdToInt(response.requestID.Bytes())
161+
rid, err := bytesIdToInt(p, mh.fromV1Map, mh.toV1Map, &mh.nextIntId, response.requestID.Bytes())
146162
if err != nil {
147163
return nil, err
148164
}
@@ -164,6 +180,7 @@ func (mh *MessageHandler) ToProtoV1(gsm GraphSyncMessage) (*pb.Message_V1_0_0, e
164180
return pbm, nil
165181
}
166182

183+
// ToNet writes a GraphSyncMessage in its protobuf format to a writer
167184
func (mh *MessageHandler) ToNet(gsm GraphSyncMessage, w io.Writer) error {
168185
msg, err := mh.ToProto(gsm)
169186
if err != nil {
@@ -183,8 +200,9 @@ func (mh *MessageHandler) ToNet(gsm GraphSyncMessage, w io.Writer) error {
183200
return err
184201
}
185202

186-
func (mh *MessageHandler) ToNetV1(gsm GraphSyncMessage, w io.Writer) error {
187-
msg, err := mh.ToProtoV1(gsm)
203+
// ToNet writes a GraphSyncMessage in its v1.0.0 protobuf format to a writer
204+
func (mh *MessageHandler) ToNetV1(p peer.ID, gsm GraphSyncMessage, w io.Writer) error {
205+
msg, err := mh.ToProtoV1(p, gsm)
188206
if err != nil {
189207
return err
190208
}
@@ -202,31 +220,37 @@ func (mh *MessageHandler) ToNetV1(gsm GraphSyncMessage, w io.Writer) error {
202220
return err
203221
}
204222

205-
func (mh *MessageHandler) bytesIdToInt(id []byte) (int32, error) {
223+
// Maps a []byte slice form of a RequestID (uuid) to an integer format as used
224+
// by a v1 peer. Inverse of intIdToRequestId()
225+
func bytesIdToInt(p peer.ID, fromV1Map map[v1RequestKey]graphsync.RequestID, toV1Map map[graphsync.RequestID]int32, nextIntId *int32, id []byte) (int32, error) {
206226
rid, err := graphsync.ParseRequestID(id)
207227
if err != nil {
208228
return 0, err
209229
}
210-
iid, ok := mh.toV1Map[rid]
230+
iid, ok := toV1Map[rid]
211231
if !ok {
212-
iid = mh.nextIntId
213-
mh.nextIntId++
214-
mh.toV1Map[rid] = iid
215-
mh.fromV1Map[iid] = rid
232+
iid = *nextIntId
233+
*nextIntId++
234+
toV1Map[rid] = iid
235+
fromV1Map[v1RequestKey{p, iid}] = rid
216236
}
217237
return iid, nil
218238
}
219239

220-
func (mh *MessageHandler) intIdToRequestId(id int32) (graphsync.RequestID, error) {
221-
rid, ok := mh.fromV1Map[id]
240+
// Maps an integer form of a RequestID as used by a v1 peer to a native (uuid) form.
241+
// Inverse of bytesIdToInt().
242+
func intIdToRequestId(p peer.ID, fromV1Map map[v1RequestKey]graphsync.RequestID, toV1Map map[graphsync.RequestID]int32, iid int32) (graphsync.RequestID, error) {
243+
key := v1RequestKey{p, iid}
244+
rid, ok := fromV1Map[key]
222245
if !ok {
223246
rid = graphsync.NewRequestID()
224-
mh.fromV1Map[id] = rid
225-
mh.toV1Map[rid] = id
247+
fromV1Map[key] = rid
248+
toV1Map[rid] = iid
226249
}
227250
return rid, nil
228251
}
229252

253+
// Mapping from a pb.Message object to a GraphSyncMessage object
230254
func (mh *MessageHandler) newMessageFromProto(pbm *pb.Message) (GraphSyncMessage, error) {
231255
requests := make(map[graphsync.RequestID]GraphSyncRequest, len(pbm.Requests))
232256
for _, req := range pbm.Requests {
@@ -305,7 +329,12 @@ func (mh *MessageHandler) newMessageFromProto(pbm *pb.Message) (GraphSyncMessage
305329
}, nil
306330
}
307331

308-
func (mh *MessageHandler) newMessageFromProtoV1(pbm *pb.Message_V1_0_0) (GraphSyncMessage, error) {
332+
// Mapping from a pb.Message_V1_0_0 object to a GraphSyncMessage object, including
333+
// RequestID (int / uuid) mapping.
334+
func (mh *MessageHandler) newMessageFromProtoV1(p peer.ID, pbm *pb.Message_V1_0_0) (GraphSyncMessage, error) {
335+
mh.mapLock.Lock()
336+
defer mh.mapLock.Unlock()
337+
309338
requests := make(map[graphsync.RequestID]GraphSyncRequest, len(pbm.Requests))
310339
for _, req := range pbm.Requests {
311340
if req == nil {
@@ -331,7 +360,7 @@ func (mh *MessageHandler) newMessageFromProtoV1(pbm *pb.Message_V1_0_0) (GraphSy
331360
if exts == nil {
332361
exts = make(map[string][]byte)
333362
}
334-
id, err := mh.intIdToRequestId(req.Id)
363+
id, err := intIdToRequestId(p, mh.fromV1Map, mh.toV1Map, req.Id)
335364
if err != nil {
336365
return GraphSyncMessage{}, err
337366
}
@@ -347,7 +376,7 @@ func (mh *MessageHandler) newMessageFromProtoV1(pbm *pb.Message_V1_0_0) (GraphSy
347376
if exts == nil {
348377
exts = make(map[string][]byte)
349378
}
350-
id, err := mh.intIdToRequestId(res.Id)
379+
id, err := intIdToRequestId(p, mh.fromV1Map, mh.toV1Map, res.Id)
351380
if err != nil {
352381
return GraphSyncMessage{}, err
353382
}

0 commit comments

Comments
 (0)