Skip to content

Commit 161a577

Browse files
committed
feat(net): activate v2 network as default
1 parent fc58e23 commit 161a577

5 files changed

Lines changed: 71 additions & 52 deletions

File tree

impl/graphsync_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ var protocolsForTest = map[string]struct {
6363
host1Protocols []protocol.ID
6464
host2Protocols []protocol.ID
6565
}{
66-
"(v1.1 -> v1.1)": {nil, nil},
67-
"(v1.0 -> v1.1)": {[]protocol.ID{gsnet.ProtocolGraphsync_1_0_0}, nil},
68-
"(v1.1 -> v1.0)": {nil, []protocol.ID{gsnet.ProtocolGraphsync_1_0_0}},
66+
"(v2.0 -> v2.0)": {nil, nil},
67+
"(v1.0 -> v2.0)": {[]protocol.ID{gsnet.ProtocolGraphsync_1_0_0}, nil},
68+
"(v2.0 -> v1.0)": {nil, []protocol.ID{gsnet.ProtocolGraphsync_1_0_0}},
6969
"(v1.0 -> v1.0)": {[]protocol.ID{gsnet.ProtocolGraphsync_1_0_0}, []protocol.ID{gsnet.ProtocolGraphsync_1_0_0}},
7070
}
7171

message/v1/message.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,17 @@ import (
88

99
blocks "github.com/ipfs/go-block-format"
1010
"github.com/ipfs/go-cid"
11-
"github.com/ipfs/go-graphsync"
12-
"github.com/ipfs/go-graphsync/ipldutil"
13-
"github.com/ipfs/go-graphsync/message"
14-
pb "github.com/ipfs/go-graphsync/message/pb"
1511
"github.com/ipld/go-ipld-prime/datamodel"
1612
pool "github.com/libp2p/go-buffer-pool"
1713
"github.com/libp2p/go-libp2p-core/network"
1814
"github.com/libp2p/go-libp2p-core/peer"
1915
"github.com/libp2p/go-msgio"
2016
"google.golang.org/protobuf/proto"
17+
18+
"github.com/ipfs/go-graphsync"
19+
"github.com/ipfs/go-graphsync/ipldutil"
20+
"github.com/ipfs/go-graphsync/message"
21+
pb "github.com/ipfs/go-graphsync/message/pb"
2122
)
2223

2324
type MessagePartWithExtensions interface {

message/v2/message.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,16 @@ import (
77

88
blocks "github.com/ipfs/go-block-format"
99
"github.com/ipfs/go-cid"
10-
"github.com/ipfs/go-graphsync"
11-
"github.com/ipfs/go-graphsync/message"
12-
"github.com/ipfs/go-graphsync/message/ipldbind"
1310
"github.com/ipld/go-ipld-prime/codec/dagcbor"
1411
"github.com/ipld/go-ipld-prime/datamodel"
1512
"github.com/ipld/go-ipld-prime/node/bindnode"
1613
"github.com/libp2p/go-libp2p-core/network"
1714
"github.com/libp2p/go-libp2p-core/peer"
1815
"github.com/libp2p/go-msgio"
16+
17+
"github.com/ipfs/go-graphsync"
18+
"github.com/ipfs/go-graphsync/message"
19+
"github.com/ipfs/go-graphsync/message/ipldbind"
1920
)
2021

2122
type MessageHandler struct{}

network/libp2p_impl.go

Lines changed: 57 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
gsmsg "github.com/ipfs/go-graphsync/message"
1818
gsmsgv1 "github.com/ipfs/go-graphsync/message/v1"
19+
gsmsgv2 "github.com/ipfs/go-graphsync/message/v2"
1920
)
2021

2122
var log = logging.Logger("graphsync_network")
@@ -35,10 +36,14 @@ func GraphsyncProtocols(protocols []protocol.ID) Option {
3536

3637
// NewFromLibp2pHost returns a GraphSyncNetwork supported by underlying Libp2p host.
3738
func NewFromLibp2pHost(host host.Host, options ...Option) GraphSyncNetwork {
39+
messageHandlerSelector := messageHandlerSelector{
40+
v1MessageHandler: gsmsgv1.NewMessageHandler(),
41+
v2MessageHandler: gsmsgv2.NewMessageHandler(),
42+
}
3843
graphSyncNetwork := libp2pGraphSyncNetwork{
39-
host: host,
40-
messageHandler: gsmsgv1.NewMessageHandler(),
41-
protocols: []protocol.ID{ProtocolGraphsync_1_0_0, ProtocolGraphsync_2_0_0},
44+
host: host,
45+
messageHandlerSelector: &messageHandlerSelector,
46+
protocols: []protocol.ID{ProtocolGraphsync_2_0_0, ProtocolGraphsync_1_0_0},
4247
}
4348

4449
for _, option := range options {
@@ -48,20 +53,53 @@ func NewFromLibp2pHost(host host.Host, options ...Option) GraphSyncNetwork {
4853
return &graphSyncNetwork
4954
}
5055

56+
// a message.MessageHandler that simply returns an error for any of the calls, allows
57+
// us to simplify erroring on bad protocol within the messageHandlerSelector#Select()
58+
// call so we only have one place to be strict about allowed versions
59+
type messageHandlerErrorer struct {
60+
err error
61+
}
62+
63+
func (mhe messageHandlerErrorer) FromNet(peer.ID, io.Reader) (gsmsg.GraphSyncMessage, error) {
64+
return gsmsg.GraphSyncMessage{}, mhe.err
65+
}
66+
func (mhe messageHandlerErrorer) FromMsgReader(peer.ID, msgio.Reader) (gsmsg.GraphSyncMessage, error) {
67+
return gsmsg.GraphSyncMessage{}, mhe.err
68+
}
69+
func (mhe messageHandlerErrorer) ToNet(peer.ID, gsmsg.GraphSyncMessage, io.Writer) error {
70+
return mhe.err
71+
}
72+
73+
type messageHandlerSelector struct {
74+
v1MessageHandler gsmsg.MessageHandler
75+
v2MessageHandler gsmsg.MessageHandler
76+
}
77+
78+
func (smh messageHandlerSelector) Select(protocol protocol.ID) gsmsg.MessageHandler {
79+
switch protocol {
80+
case ProtocolGraphsync_1_0_0:
81+
return smh.v1MessageHandler
82+
case ProtocolGraphsync_2_0_0:
83+
return smh.v2MessageHandler
84+
default:
85+
return messageHandlerErrorer{fmt.Errorf("unrecognized protocol version: %s", protocol)}
86+
}
87+
}
88+
5189
// libp2pGraphSyncNetwork transforms the libp2p host interface, which sends and receives
5290
// NetMessage objects, into the graphsync network interface.
5391
type libp2pGraphSyncNetwork struct {
5492
host host.Host
5593
// inbound messages from the network are forwarded to the receiver
56-
receiver Receiver
57-
messageHandler gsmsg.MessageHandler
58-
protocols []protocol.ID
94+
receiver Receiver
95+
protocols []protocol.ID
96+
messageHandlerSelector *messageHandlerSelector
5997
}
6098

6199
type streamMessageSender struct {
62-
s network.Stream
63-
opts MessageSenderOpts
64-
messageHandler gsmsg.MessageHandler
100+
s network.Stream
101+
opts MessageSenderOpts
102+
messageHandlerSelector *messageHandlerSelector
65103
}
66104

67105
func (s *streamMessageSender) Close() error {
@@ -73,10 +111,10 @@ func (s *streamMessageSender) Reset() error {
73111
}
74112

75113
func (s *streamMessageSender) SendMsg(ctx context.Context, msg gsmsg.GraphSyncMessage) error {
76-
return msgToStream(ctx, s.s, s.messageHandler, msg, s.opts.SendTimeout)
114+
return msgToStream(ctx, s.s, s.messageHandlerSelector, msg, s.opts.SendTimeout)
77115
}
78116

79-
func msgToStream(ctx context.Context, s network.Stream, mh gsmsg.MessageHandler, msg gsmsg.GraphSyncMessage, timeout time.Duration) error {
117+
func msgToStream(ctx context.Context, s network.Stream, mh *messageHandlerSelector, msg gsmsg.GraphSyncMessage, timeout time.Duration) error {
80118
log.Debugf("Outgoing message with %d requests, %d responses, and %d blocks",
81119
len(msg.Requests()), len(msg.Responses()), len(msg.Blocks()))
82120

@@ -88,19 +126,9 @@ func msgToStream(ctx context.Context, s network.Stream, mh gsmsg.MessageHandler,
88126
log.Warnf("error setting deadline: %s", err)
89127
}
90128

91-
switch s.Protocol() {
92-
case ProtocolGraphsync_1_0_0:
93-
if err := mh.ToNet(s.Conn().RemotePeer(), msg, s); err != nil {
94-
log.Debugf("error: %s", err)
95-
return err
96-
}
97-
case ProtocolGraphsync_2_0_0:
98-
if err := mh.ToNet(s.Conn().RemotePeer(), msg, s); err != nil {
99-
log.Debugf("error: %s", err)
100-
return err
101-
}
102-
default:
103-
return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
129+
if err := mh.Select(s.Protocol()).ToNet(s.Conn().RemotePeer(), msg, s); err != nil {
130+
log.Debugf("error: %s", err)
131+
return err
104132
}
105133

106134
if err := s.SetWriteDeadline(time.Time{}); err != nil {
@@ -116,9 +144,9 @@ func (gsnet *libp2pGraphSyncNetwork) NewMessageSender(ctx context.Context, p pee
116144
}
117145

118146
return &streamMessageSender{
119-
s: s,
120-
opts: setDefaults(opts),
121-
messageHandler: gsnet.messageHandler,
147+
s: s,
148+
opts: setDefaults(opts),
149+
messageHandlerSelector: gsnet.messageHandlerSelector,
122150
}, nil
123151
}
124152

@@ -136,7 +164,7 @@ func (gsnet *libp2pGraphSyncNetwork) SendMessage(
136164
return err
137165
}
138166

139-
if err = msgToStream(ctx, s, gsnet.messageHandler, outgoing, sendMessageTimeout); err != nil {
167+
if err = msgToStream(ctx, s, gsnet.messageHandlerSelector, outgoing, sendMessageTimeout); err != nil {
140168
_ = s.Reset()
141169
return err
142170
}
@@ -167,16 +195,7 @@ func (gsnet *libp2pGraphSyncNetwork) handleNewStream(s network.Stream) {
167195

168196
reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
169197
for {
170-
var received gsmsg.GraphSyncMessage
171-
var err error
172-
switch s.Protocol() {
173-
case ProtocolGraphsync_1_0_0:
174-
received, err = gsnet.messageHandler.FromMsgReader(s.Conn().RemotePeer(), reader)
175-
case ProtocolGraphsync_2_0_0:
176-
received, err = gsnet.messageHandler.FromMsgReader(s.Conn().RemotePeer(), reader)
177-
default:
178-
err = fmt.Errorf("unexpected protocol version %s", s.Protocol())
179-
}
198+
received, err := gsnet.messageHandlerSelector.Select(s.Protocol()).FromMsgReader(s.Conn().RemotePeer(), reader)
180199
p := s.Conn().RemotePeer()
181200

182201
if err != nil {

network/libp2p_impl_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ func TestMessageSendAndReceive(t *testing.T) {
106106
receivedRequests := received.Requests()
107107
require.Len(t, receivedRequests, 1, "did not add request to received message")
108108
receivedRequest := receivedRequests[0]
109-
// TODO: for protocol v1 this shouldn't match, but for v2 it should
110-
// require.Equal(t, sentRequest.ID(), receivedRequest.ID())
109+
require.Equal(t, sentRequest.ID(), receivedRequest.ID())
111110
require.Equal(t, sentRequest.IsCancel(), receivedRequest.IsCancel())
112111
require.Equal(t, sentRequest.Priority(), receivedRequest.Priority())
113112
require.Equal(t, sentRequest.Root().String(), receivedRequest.Root().String())
@@ -120,8 +119,7 @@ func TestMessageSendAndReceive(t *testing.T) {
120119
require.Len(t, receivedResponses, 1, "did not add response to received message")
121120
receivedResponse := receivedResponses[0]
122121
extensionData, found := receivedResponse.Extension(extensionName)
123-
// TODO: for protocol v1 this shouldn't match, but for v2 it should
124-
// require.Equal(t, sentResponse.RequestID(), receivedResponse.RequestID())
122+
require.Equal(t, sentResponse.RequestID(), receivedResponse.RequestID())
125123
require.Equal(t, sentResponse.Status(), receivedResponse.Status())
126124
require.True(t, found)
127125
require.Equal(t, extension.Data, extensionData)

0 commit comments

Comments
 (0)