Skip to content

Commit c394b09

Browse files
thinkAfCodGrapeBaBa
authored andcommitted
fix: upgrade utp-go
1 parent caa3895 commit c394b09

5 files changed

Lines changed: 48 additions & 50 deletions

File tree

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ require (
5454
github.com/mattn/go-sqlite3 v1.14.18
5555
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416
5656
github.com/olekukonko/tablewriter v0.0.5
57-
github.com/optimism-java/utp-go v0.0.0-20241023035141-295a86339e8b
57+
github.com/optimism-java/utp-go v0.0.0-20241110145701-0f0eebf881b3
5858
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7
5959
github.com/pion/stun v0.6.1
6060
github.com/protolambda/bls12-381-util v0.1.0
@@ -149,6 +149,7 @@ require (
149149
github.com/russross/blackfriday/v2 v2.1.0 // indirect
150150
github.com/tklauser/go-sysconf v0.3.12 // indirect
151151
github.com/tklauser/numcpus v0.6.1 // indirect
152+
github.com/valyala/fastrand v1.1.0 // indirect
152153
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
153154
go.uber.org/multierr v1.11.0 // indirect
154155
golang.org/x/mod v0.17.0 // indirect

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -421,8 +421,8 @@ github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
421421
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
422422
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
423423
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
424-
github.com/optimism-java/utp-go v0.0.0-20241023035141-295a86339e8b h1:WAb+ccelzvqShjfzv2mQQMNK0BTmSW1fzDlE8WrMYpQ=
425-
github.com/optimism-java/utp-go v0.0.0-20241023035141-295a86339e8b/go.mod h1:DZ0jYzLzt4ZsCmhI/iqYgGFoNx45OfpEoKzXB8HVALQ=
424+
github.com/optimism-java/utp-go v0.0.0-20241110145701-0f0eebf881b3 h1:KfAZ//Sxrqulozmw4QoC8jY3h4I5diWXWPmZ1gptvGY=
425+
github.com/optimism-java/utp-go v0.0.0-20241110145701-0f0eebf881b3/go.mod h1:dJZNMUlyNpjM2VkUEHhmFprLei6gCg3r7U9qj9MmJNQ=
426426
github.com/optimism-java/zrnt v0.32.4-0.20240415084906-d9dbf06b32f7 h1:ZTQWXQ8xblCRUXhZs3h5qrBMSAHe8iNH7BG7a7IVFlI=
427427
github.com/optimism-java/zrnt v0.32.4-0.20240415084906-d9dbf06b32f7/go.mod h1:A0fezkp9Tt3GBLATSPIbuY4ywYESyAuc/FFmPKg8Lqs=
428428
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 h1:oYW+YCJ1pachXTQmzR3rNLYGGz4g/UgFcjb28p/viDM=
@@ -524,6 +524,8 @@ github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9f
524524
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
525525
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
526526
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
527+
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
528+
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
527529
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
528530
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
529531
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=

p2p/discover/portal_protocol.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ func (p *PortalProtocol) processOffer(target *enode.Node, resp []byte, request *
582582
}
583583

584584
connctx, conncancel := context.WithTimeout(ctx, defaultUTPConnectTimeout)
585-
conn, err = p.Utp.DialWithCid(connctx, target, connId)
585+
conn, err = p.Utp.DialWithCid(connctx, target, libutp.ReceConnId(connId).SendId())
586586
conncancel()
587587
if err != nil {
588588
if metrics.Enabled {
@@ -672,7 +672,7 @@ func (p *PortalProtocol) processContent(target *enode.Node, resp []byte) (byte,
672672
}
673673
connctx, conncancel := context.WithTimeout(p.closeCtx, defaultUTPConnectTimeout)
674674
connId := binary.BigEndian.Uint16(connIdMsg.Id[:])
675-
conn, err := p.Utp.DialWithCid(connctx, target, connId)
675+
conn, err := p.Utp.DialWithCid(connctx, target, libutp.ReceConnId(connId).SendId())
676676
defer func() {
677677
if conn == nil {
678678
if metrics.Enabled {
@@ -1105,9 +1105,9 @@ func (p *PortalProtocol) handleFindContent(id enode.ID, addr *net.UDPAddr, reque
11051105
case <-bctx.Done():
11061106
return
11071107
default:
1108-
p.Log.Debug("will accept find content conn from: ", "source", addr, "connId", connId)
1108+
p.Log.Debug("will accept find content conn from: ", "nodeId", id.String(), "source", addr, "connId", connId)
11091109
connectCtx, cancel = context.WithTimeout(bctx, defaultUTPConnectTimeout)
1110-
conn, err = p.Utp.AcceptWithCid(connectCtx, id, uint16(connectionId.SendId()))
1110+
conn, err = p.Utp.AcceptWithCid(connectCtx, id, connectionId)
11111111
cancel()
11121112
if err != nil {
11131113
if metrics.Enabled {
@@ -1239,7 +1239,7 @@ func (p *PortalProtocol) handleOffer(id enode.ID, addr *net.UDPAddr, request *po
12391239
default:
12401240
p.Log.Debug("will accept offer conn from: ", "source", addr, "connId", connId)
12411241
connectCtx, cancel = context.WithTimeout(bctx, defaultUTPConnectTimeout)
1242-
conn, err = p.Utp.AcceptWithCid(connectCtx, id, uint16(connectionId.SendId()))
1242+
conn, err = p.Utp.AcceptWithCid(connectCtx, id, connectionId)
12431243
cancel()
12441244
if err != nil {
12451245
if metrics.Enabled {

p2p/discover/portal_protocol_test.go

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/ethereum/go-ethereum/portalnetwork/storage"
1515
"github.com/optimism-java/utp-go"
16+
"github.com/optimism-java/utp-go/libutp"
1617
"github.com/prysmaticlabs/go-bitfield"
1718
"golang.org/x/exp/slices"
1819

@@ -126,19 +127,10 @@ func TestPortalWireProtocolUdp(t *testing.T) {
126127
assert.NoError(t, err)
127128
time.Sleep(12 * time.Second)
128129

129-
udpAddrStr1 := fmt.Sprintf("%s:%d", node1.localNode.Node().IP(), node1.localNode.Node().UDP())
130-
udpAddrStr2 := fmt.Sprintf("%s:%d", node2.localNode.Node().IP(), node2.localNode.Node().UDP())
131-
132-
node1Addr, _ := utp.ResolveUTPAddr("utp", udpAddrStr1)
133-
node2Addr, _ := utp.ResolveUTPAddr("utp", udpAddrStr2)
134-
fmt.Println(udpAddrStr1)
135-
fmt.Println(udpAddrStr2)
136-
fmt.Println(node1Addr)
137-
fmt.Println(node2Addr)
138-
139-
cid := uint16(12)
140-
cliSendMsgWithCid := "there are connection id : 12!"
141-
cliSendMsgWithRandomCid := "there are connection id: random!"
130+
cid1 := libutp.ReceConnId(12)
131+
cid2 := libutp.ReceConnId(116)
132+
cliSendMsgWithCid1 := "there are connection id : 12!"
133+
cliSendMsgWithCid2 := "there are connection id: 116!"
142134

143135
serverEchoWithCid := "accept connection sends back msg: echo"
144136

@@ -156,7 +148,7 @@ func TestPortalWireProtocolUdp(t *testing.T) {
156148
workGroup.Done()
157149
_ = acceptConn.Close()
158150
}()
159-
acceptConn, err := node1.Utp.AcceptWithCid(context.Background(), node2.localNode.ID(), cid)
151+
acceptConn, err := node1.Utp.AcceptWithCid(context.Background(), node2.localNode.ID(), cid1)
160152
if err != nil {
161153
panic(err)
162154
}
@@ -166,30 +158,30 @@ func TestPortalWireProtocolUdp(t *testing.T) {
166158
if err != nil && err != io.EOF {
167159
panic(err)
168160
}
169-
assert.Equal(t, cliSendMsgWithCid, string(buf[:n]))
161+
assert.Equal(t, cliSendMsgWithCid1, string(buf[:n]))
170162
_, err = acceptConn.Write([]byte(serverEchoWithCid))
171163
if err != nil {
172164
panic(err)
173165
}
174166
}()
175167
go func() {
176-
var randomConnIdConn net.Conn
168+
var connId2Conn net.Conn
177169
defer func() {
178170
workGroup.Done()
179-
_ = randomConnIdConn.Close()
171+
_ = connId2Conn.Close()
180172
}()
181-
randomConnIdConn, err := node1.Utp.Accept(context.Background())
173+
connId2Conn, err := node1.Utp.AcceptWithCid(context.Background(), node2.localNode.ID(), cid2)
182174
if err != nil {
183175
panic(err)
184176
}
185177
buf := make([]byte, 100)
186-
n, err := randomConnIdConn.Read(buf)
178+
n, err := connId2Conn.Read(buf)
187179
if err != nil && err != io.EOF {
188180
panic(err)
189181
}
190-
assert.Equal(t, cliSendMsgWithRandomCid, string(buf[:n]))
182+
assert.Equal(t, cliSendMsgWithCid2, string(buf[:n]))
191183

192-
_, err = randomConnIdConn.Write(largeTestContent)
184+
_, err = connId2Conn.Write(largeTestContent)
193185
if err != nil {
194186
panic(err)
195187
}
@@ -203,11 +195,11 @@ func TestPortalWireProtocolUdp(t *testing.T) {
203195
_ = connWithConnId.Close()
204196
}
205197
}()
206-
connWithConnId, err = node2.Utp.DialWithCid(context.Background(), node1.localNode.Node(), cid)
198+
connWithConnId, err = node2.Utp.DialWithCid(context.Background(), node1.localNode.Node(), uint16(cid1.SendId()))
207199
if err != nil {
208200
panic(err)
209201
}
210-
_, err = connWithConnId.Write([]byte("there are connection id : 12!"))
202+
_, err = connWithConnId.Write([]byte(cliSendMsgWithCid1))
211203
if err != nil && err != io.EOF {
212204
panic(err)
213205
}
@@ -219,18 +211,18 @@ func TestPortalWireProtocolUdp(t *testing.T) {
219211
assert.Equal(t, serverEchoWithCid, string(buf[:n]))
220212
}()
221213
go func() {
222-
var randomConnIdConn net.Conn
214+
var ConnId2Conn net.Conn
223215
defer func() {
224216
workGroup.Done()
225-
if randomConnIdConn != nil {
226-
_ = randomConnIdConn.Close()
217+
if ConnId2Conn != nil {
218+
_ = ConnId2Conn.Close()
227219
}
228220
}()
229-
randomConnIdConn, err = node2.Utp.Dial(context.Background(), node1.localNode.Node())
221+
ConnId2Conn, err = node2.Utp.DialWithCid(context.Background(), node1.localNode.Node(), uint16(cid2.SendId()))
230222
if err != nil && err != io.EOF {
231223
panic(err)
232224
}
233-
_, err = randomConnIdConn.Write([]byte(cliSendMsgWithRandomCid))
225+
_, err = ConnId2Conn.Write([]byte(cliSendMsgWithCid2))
234226
if err != nil {
235227
panic(err)
236228
}
@@ -239,7 +231,7 @@ func TestPortalWireProtocolUdp(t *testing.T) {
239231
buf := make([]byte, 1024)
240232
for {
241233
var n int
242-
n, err = randomConnIdConn.Read(buf)
234+
n, err = ConnId2Conn.Read(buf)
243235
if err != nil {
244236
if errors.Is(err, io.EOF) {
245237
break

p2p/discover/portal_utp.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/ethereum/go-ethereum/p2p/enode"
1414
"github.com/ethereum/go-ethereum/p2p/netutil"
1515
"github.com/optimism-java/utp-go"
16+
"github.com/optimism-java/utp-go/libutp"
1617
"go.uber.org/zap"
1718
)
1819

@@ -32,10 +33,11 @@ type PortalUtp struct {
3233

3334
func NewPortalUtp(ctx context.Context, config *PortalProtocolConfig, discV5 *UDPv5, conn UDPConn) *PortalUtp {
3435
return &PortalUtp{
35-
ctx: ctx,
36-
log: log.New("protocol", "utp"),
37-
discV5: discV5,
38-
conn: conn,
36+
ctx: ctx,
37+
log: log.New("protocol", "utp", "local", conn.LocalAddr().String()),
38+
discV5: discV5,
39+
conn: conn,
40+
ListenAddr: config.ListenAddr,
3941
}
4042
}
4143

@@ -95,8 +97,8 @@ func (p *PortalUtp) Stop() {
9597

9698
func (p *PortalUtp) DialWithCid(ctx context.Context, dest *enode.Node, connId uint16) (net.Conn, error) {
9799
raddr := &utp.Addr{IP: dest.IP(), Port: dest.UDP()}
98-
p.log.Info("will connect to: ", "addr", raddr.String(), "connId", connId)
99-
conn, err := utp.DialUTPOptions("utp", p.lAddr, raddr, utp.WithContext(ctx), utp.WithSocketManager(p.utpSm), utp.WithConnId(uint32(connId)))
100+
p.log.Debug("will connect to: ", "nodeId", dest.ID().String(), "connId", connId)
101+
conn, err := utp.DialUTPOptions("utp", p.lAddr, raddr, utp.WithContext(ctx), utp.WithSocketManager(p.utpSm), utp.WithConnId(connId))
100102
return conn, err
101103
}
102104

@@ -107,12 +109,13 @@ func (p *PortalUtp) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error
107109
return conn, err
108110
}
109111

110-
func (p *PortalUtp) AcceptWithCid(ctx context.Context, nodeId enode.ID, cid uint16) (*utp.Conn, error) {
111-
return p.listener.AcceptUTPContext(ctx, uint32(cid))
112+
func (p *PortalUtp) AcceptWithCid(ctx context.Context, nodeId enode.ID, cid *libutp.ConnId) (*utp.Conn, error) {
113+
p.log.Debug("will accept from: ", "nodeId", nodeId.String(), "sendId", cid.SendId(), "recvId", cid.RecvId())
114+
return p.listener.AcceptUTPContext(ctx, nodeId, cid)
112115
}
113116

114117
func (p *PortalUtp) Accept(ctx context.Context) (*utp.Conn, error) {
115-
return p.listener.AcceptUTPContext(ctx, 0)
118+
return p.listener.AcceptUTPContext(ctx, enode.ID{}, nil)
116119
}
117120

118121
func (p *PortalUtp) getLocalAddr() *net.UDPAddr {
@@ -121,8 +124,8 @@ func (p *PortalUtp) getLocalAddr() *net.UDPAddr {
121124
return laddr
122125
}
123126

124-
func (p *PortalUtp) packetRouterFunc(buf []byte, addr *net.UDPAddr) (int, error) {
125-
p.log.Info("will send to target data", "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf))
127+
func (p *PortalUtp) packetRouterFunc(buf []byte, id enode.ID, addr *net.UDPAddr) (int, error) {
128+
p.log.Info("will send to target data", "nodeId", id.String(), "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf))
126129

127130
if n, ok := p.discV5.GetCachedNode(addr.String()); ok {
128131
//_, err := p.DiscV5.TalkRequestToID(id, addr, string(portalwire.UTPNetwork), buf)
@@ -137,7 +140,7 @@ func (p *PortalUtp) packetRouterFunc(buf []byte, addr *net.UDPAddr) (int, error)
137140
}
138141

139142
func (p *PortalUtp) handleUtpTalkRequest(id enode.ID, addr *net.UDPAddr, msg []byte) []byte {
140-
p.log.Trace("receive utp data", "addr", addr, "msg-length", len(msg))
141-
p.packetRouter.ReceiveMessage(msg, addr)
143+
p.log.Trace("receive utp data", "nodeId", id.String(), "addr", addr, "msg-length", len(msg))
144+
p.packetRouter.ReceiveMessage(msg, &utp.NodeInfo{Id: id, Addr: addr})
142145
return []byte("")
143146
}

0 commit comments

Comments
 (0)