Skip to content

Commit b77ec6b

Browse files
authored
feat(sensor): support eth/69 (#863)
1 parent b8c36d7 commit b77ec6b

2 files changed

Lines changed: 124 additions & 20 deletions

File tree

cmd/p2p/sensor/sensor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ var SensorCmd = &cobra.Command{
228228
p2p.NewEthProtocol(66, opts),
229229
p2p.NewEthProtocol(67, opts),
230230
p2p.NewEthProtocol(68, opts),
231+
p2p.NewEthProtocol(69, opts),
231232
},
232233
}
233234

p2p/protocol.go

Lines changed: 123 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,7 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
9696
messages: NewPeerMessages(),
9797
}
9898

99-
head := c.conns.HeadBlock()
100-
status := eth.StatusPacket68{
101-
ProtocolVersion: uint32(version),
102-
NetworkID: opts.NetworkID,
103-
Genesis: opts.GenesisHash,
104-
ForkID: opts.ForkID,
105-
Head: head.Block.Hash(),
106-
TD: head.TD,
107-
}
108-
err := c.statusExchange(&status)
109-
if err != nil {
99+
if err := c.statusExchange(version, opts); err != nil {
110100
return err
111101
}
112102

@@ -146,6 +136,8 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
146136
err = c.handlePooledTransactions(ctx, msg)
147137
case eth.GetReceiptsMsg:
148138
err = c.handleGetReceipts(msg)
139+
case eth.BlockRangeUpdateMsg:
140+
err = c.handleBlockRangeUpdate(msg)
149141
default:
150142
c.logger.Trace().Interface("msg", msg).Send()
151143
}
@@ -166,18 +158,78 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
166158
}
167159
}
168160

169-
// statusExchange will exchange status message between the nodes. It will return
170-
// an error if the nodes are incompatible.
171-
func (c *conn) statusExchange(packet *eth.StatusPacket68) error {
161+
// statusExchange performs the eth protocol handshake, using the appropriate
162+
// status packet format based on the negotiated protocol version.
163+
func (c *conn) statusExchange(version uint, opts EthProtocolOptions) error {
164+
head := c.conns.HeadBlock()
165+
166+
if version >= eth.ETH69 {
167+
status := eth.StatusPacket69{
168+
ProtocolVersion: uint32(version),
169+
NetworkID: opts.NetworkID,
170+
Genesis: opts.GenesisHash,
171+
ForkID: opts.ForkID,
172+
EarliestBlock: head.Block.NumberU64(),
173+
LatestBlock: head.Block.NumberU64(),
174+
LatestBlockHash: head.Block.Hash(),
175+
}
176+
177+
return c.statusExchange69(&status)
178+
}
179+
180+
status := eth.StatusPacket68{
181+
ProtocolVersion: uint32(version),
182+
NetworkID: opts.NetworkID,
183+
Genesis: opts.GenesisHash,
184+
ForkID: opts.ForkID,
185+
Head: head.Block.Hash(),
186+
TD: head.TD,
187+
}
188+
189+
return c.statusExchange68(&status)
190+
}
191+
192+
// statusExchange68 will exchange status message for ETH68 and below.
193+
func (c *conn) statusExchange68(packet *eth.StatusPacket68) error {
172194
errc := make(chan error, 2)
173195

174196
go func() {
175197
c.countMsgSent((&eth.StatusPacket68{}).Name(), 1)
176-
errc <- ethp2p.Send(c.rw, eth.StatusMsg, &packet)
198+
errc <- ethp2p.Send(c.rw, eth.StatusMsg, packet)
199+
}()
200+
201+
go func() {
202+
errc <- c.readStatus68(packet)
203+
}()
204+
205+
timeout := time.NewTimer(5 * time.Second)
206+
defer timeout.Stop()
207+
208+
for range 2 {
209+
select {
210+
case err := <-errc:
211+
if err != nil {
212+
return err
213+
}
214+
case <-timeout.C:
215+
return ethp2p.DiscReadTimeout
216+
}
217+
}
218+
219+
return nil
220+
}
221+
222+
// statusExchange69 will exchange status message for ETH69.
223+
func (c *conn) statusExchange69(packet *eth.StatusPacket69) error {
224+
errc := make(chan error, 2)
225+
226+
go func() {
227+
c.countMsgSent((&eth.StatusPacket69{}).Name(), 1)
228+
errc <- ethp2p.Send(c.rw, eth.StatusMsg, packet)
177229
}()
178230

179231
go func() {
180-
errc <- c.readStatus(packet)
232+
errc <- c.readStatus69(packet)
181233
}()
182234

183235
timeout := time.NewTimer(5 * time.Second)
@@ -217,7 +269,7 @@ func (c *conn) countMsgSent(messageName string, count float64) {
217269
c.messages.IncrementSent(messageName, int64(count))
218270
}
219271

220-
func (c *conn) readStatus(packet *eth.StatusPacket68) error {
272+
func (c *conn) readStatus68(packet *eth.StatusPacket68) error {
221273
msg, err := c.rw.ReadMsg()
222274
if err != nil {
223275
return err
@@ -228,31 +280,82 @@ func (c *conn) readStatus(packet *eth.StatusPacket68) error {
228280
}
229281

230282
var status eth.StatusPacket68
231-
err = msg.Decode(&status)
232-
if err != nil {
283+
if err := msg.Decode(&status); err != nil {
233284
return err
234285
}
235286

236287
if status.NetworkID != packet.NetworkID {
237288
return fmt.Errorf("network ID mismatch: %d (!= %d)", status.NetworkID, packet.NetworkID)
238289
}
239-
240290
if status.Genesis != packet.Genesis {
241291
return fmt.Errorf("genesis mismatch: %v (!= %v)", status.Genesis, packet.Genesis)
242292
}
293+
if status.ForkID.Hash != packet.ForkID.Hash {
294+
return fmt.Errorf("fork ID mismatch: %v (!= %v)", status.ForkID, packet.ForkID)
295+
}
296+
297+
c.logger.Info().
298+
Interface("status", status).
299+
Str("fork_id", hex.EncodeToString(status.ForkID.Hash[:])).
300+
Msg("New peer")
301+
302+
return nil
303+
}
304+
305+
func (c *conn) readStatus69(packet *eth.StatusPacket69) error {
306+
msg, err := c.rw.ReadMsg()
307+
if err != nil {
308+
return err
309+
}
310+
311+
if msg.Code != eth.StatusMsg {
312+
return errors.New("expected status message code")
313+
}
243314

315+
var status eth.StatusPacket69
316+
if err := msg.Decode(&status); err != nil {
317+
return err
318+
}
319+
320+
if status.NetworkID != packet.NetworkID {
321+
return fmt.Errorf("network ID mismatch: %d (!= %d)", status.NetworkID, packet.NetworkID)
322+
}
323+
if status.Genesis != packet.Genesis {
324+
return fmt.Errorf("genesis mismatch: %v (!= %v)", status.Genesis, packet.Genesis)
325+
}
244326
if status.ForkID.Hash != packet.ForkID.Hash {
245327
return fmt.Errorf("fork ID mismatch: %v (!= %v)", status.ForkID, packet.ForkID)
246328
}
247329

248330
c.logger.Info().
249331
Interface("status", status).
250332
Str("fork_id", hex.EncodeToString(status.ForkID.Hash[:])).
333+
Uint64("earliest_block", status.EarliestBlock).
334+
Uint64("latest_block", status.LatestBlock).
251335
Msg("New peer")
252336

253337
return nil
254338
}
255339

340+
// handleBlockRangeUpdate handles BlockRangeUpdateMsg (ETH69).
341+
// This message announces the peer's available block range.
342+
func (c *conn) handleBlockRangeUpdate(msg ethp2p.Msg) error {
343+
var packet eth.BlockRangeUpdatePacket
344+
if err := msg.Decode(&packet); err != nil {
345+
c.logger.Warn().Err(err).Msg("Failed to decode BlockRangeUpdate")
346+
return nil
347+
}
348+
349+
c.countMsgReceived(packet.Name(), 1)
350+
c.logger.Debug().
351+
Uint64("earliest", packet.EarliestBlock).
352+
Uint64("latest", packet.LatestBlock).
353+
Hex("hash", packet.LatestBlockHash[:]).
354+
Msg("Received BlockRangeUpdate")
355+
356+
return nil
357+
}
358+
256359
// getBlockData will send GetBlockHeaders and/or GetBlockBodies requests to the
257360
// peer based on what parts of the block we already have. It will return an error
258361
// if sending either of the requests failed. The isParent parameter indicates if

0 commit comments

Comments
 (0)