Skip to content

Commit 6db99fa

Browse files
authored
Merge branch 'main' into minhd-vu/sensor-broadcast
2 parents 5d4ce45 + b77ec6b commit 6db99fa

2 files changed

Lines changed: 124 additions & 20 deletions

File tree

cmd/p2p/sensor/sensor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ var SensorCmd = &cobra.Command{
248248
p2p.NewEthProtocol(66, opts),
249249
p2p.NewEthProtocol(67, opts),
250250
p2p.NewEthProtocol(68, opts),
251+
p2p.NewEthProtocol(69, opts),
251252
},
252253
}
253254

p2p/protocol.go

Lines changed: 123 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -156,17 +156,7 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
156156
go c.blockAnnouncementLoop()
157157
}
158158

159-
head := c.conns.HeadBlock()
160-
status := eth.StatusPacket68{
161-
ProtocolVersion: uint32(version),
162-
NetworkID: opts.NetworkID,
163-
Genesis: opts.GenesisHash,
164-
ForkID: opts.ForkID,
165-
Head: head.Block.Hash(),
166-
TD: head.TD,
167-
}
168-
err := c.statusExchange(&status)
169-
if err != nil {
159+
if err := c.statusExchange(version, opts); err != nil {
170160
return err
171161
}
172162

@@ -205,6 +195,8 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
205195
err = c.handlePooledTransactions(ctx, msg)
206196
case eth.GetReceiptsMsg:
207197
err = c.handleGetReceipts(msg)
198+
case eth.BlockRangeUpdateMsg:
199+
err = c.handleBlockRangeUpdate(msg)
208200
default:
209201
c.logger.Trace().Interface("msg", msg).Send()
210202
}
@@ -225,18 +217,78 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
225217
}
226218
}
227219

228-
// statusExchange will exchange status message between the nodes. It will return
229-
// an error if the nodes are incompatible.
230-
func (c *conn) statusExchange(packet *eth.StatusPacket68) error {
220+
// statusExchange performs the eth protocol handshake, using the appropriate
221+
// status packet format based on the negotiated protocol version.
222+
func (c *conn) statusExchange(version uint, opts EthProtocolOptions) error {
223+
head := c.conns.HeadBlock()
224+
225+
if version >= eth.ETH69 {
226+
status := eth.StatusPacket69{
227+
ProtocolVersion: uint32(version),
228+
NetworkID: opts.NetworkID,
229+
Genesis: opts.GenesisHash,
230+
ForkID: opts.ForkID,
231+
EarliestBlock: head.Block.NumberU64(),
232+
LatestBlock: head.Block.NumberU64(),
233+
LatestBlockHash: head.Block.Hash(),
234+
}
235+
236+
return c.statusExchange69(&status)
237+
}
238+
239+
status := eth.StatusPacket68{
240+
ProtocolVersion: uint32(version),
241+
NetworkID: opts.NetworkID,
242+
Genesis: opts.GenesisHash,
243+
ForkID: opts.ForkID,
244+
Head: head.Block.Hash(),
245+
TD: head.TD,
246+
}
247+
248+
return c.statusExchange68(&status)
249+
}
250+
251+
// statusExchange68 will exchange status message for ETH68 and below.
252+
func (c *conn) statusExchange68(packet *eth.StatusPacket68) error {
231253
errc := make(chan error, 2)
232254

233255
go func() {
234256
c.countMsgSent((&eth.StatusPacket68{}).Name(), 1)
235-
errc <- ethp2p.Send(c.rw, eth.StatusMsg, &packet)
257+
errc <- ethp2p.Send(c.rw, eth.StatusMsg, packet)
258+
}()
259+
260+
go func() {
261+
errc <- c.readStatus68(packet)
262+
}()
263+
264+
timeout := time.NewTimer(5 * time.Second)
265+
defer timeout.Stop()
266+
267+
for range 2 {
268+
select {
269+
case err := <-errc:
270+
if err != nil {
271+
return err
272+
}
273+
case <-timeout.C:
274+
return ethp2p.DiscReadTimeout
275+
}
276+
}
277+
278+
return nil
279+
}
280+
281+
// statusExchange69 will exchange status message for ETH69.
282+
func (c *conn) statusExchange69(packet *eth.StatusPacket69) error {
283+
errc := make(chan error, 2)
284+
285+
go func() {
286+
c.countMsgSent((&eth.StatusPacket69{}).Name(), 1)
287+
errc <- ethp2p.Send(c.rw, eth.StatusMsg, packet)
236288
}()
237289

238290
go func() {
239-
errc <- c.readStatus(packet)
291+
errc <- c.readStatus69(packet)
240292
}()
241293

242294
timeout := time.NewTimer(5 * time.Second)
@@ -276,7 +328,7 @@ func (c *conn) countMsgSent(messageName string, count float64) {
276328
c.messages.IncrementSent(messageName, int64(count))
277329
}
278330

279-
func (c *conn) readStatus(packet *eth.StatusPacket68) error {
331+
func (c *conn) readStatus68(packet *eth.StatusPacket68) error {
280332
msg, err := c.rw.ReadMsg()
281333
if err != nil {
282334
return err
@@ -287,31 +339,82 @@ func (c *conn) readStatus(packet *eth.StatusPacket68) error {
287339
}
288340

289341
var status eth.StatusPacket68
290-
err = msg.Decode(&status)
291-
if err != nil {
342+
if err := msg.Decode(&status); err != nil {
292343
return err
293344
}
294345

295346
if status.NetworkID != packet.NetworkID {
296347
return fmt.Errorf("network ID mismatch: %d (!= %d)", status.NetworkID, packet.NetworkID)
297348
}
298-
299349
if status.Genesis != packet.Genesis {
300350
return fmt.Errorf("genesis mismatch: %v (!= %v)", status.Genesis, packet.Genesis)
301351
}
352+
if status.ForkID.Hash != packet.ForkID.Hash {
353+
return fmt.Errorf("fork ID mismatch: %v (!= %v)", status.ForkID, packet.ForkID)
354+
}
355+
356+
c.logger.Info().
357+
Interface("status", status).
358+
Str("fork_id", hex.EncodeToString(status.ForkID.Hash[:])).
359+
Msg("New peer")
360+
361+
return nil
362+
}
363+
364+
func (c *conn) readStatus69(packet *eth.StatusPacket69) error {
365+
msg, err := c.rw.ReadMsg()
366+
if err != nil {
367+
return err
368+
}
369+
370+
if msg.Code != eth.StatusMsg {
371+
return errors.New("expected status message code")
372+
}
302373

374+
var status eth.StatusPacket69
375+
if err := msg.Decode(&status); err != nil {
376+
return err
377+
}
378+
379+
if status.NetworkID != packet.NetworkID {
380+
return fmt.Errorf("network ID mismatch: %d (!= %d)", status.NetworkID, packet.NetworkID)
381+
}
382+
if status.Genesis != packet.Genesis {
383+
return fmt.Errorf("genesis mismatch: %v (!= %v)", status.Genesis, packet.Genesis)
384+
}
303385
if status.ForkID.Hash != packet.ForkID.Hash {
304386
return fmt.Errorf("fork ID mismatch: %v (!= %v)", status.ForkID, packet.ForkID)
305387
}
306388

307389
c.logger.Info().
308390
Interface("status", status).
309391
Str("fork_id", hex.EncodeToString(status.ForkID.Hash[:])).
392+
Uint64("earliest_block", status.EarliestBlock).
393+
Uint64("latest_block", status.LatestBlock).
310394
Msg("New peer")
311395

312396
return nil
313397
}
314398

399+
// handleBlockRangeUpdate handles BlockRangeUpdateMsg (ETH69).
400+
// This message announces the peer's available block range.
401+
func (c *conn) handleBlockRangeUpdate(msg ethp2p.Msg) error {
402+
var packet eth.BlockRangeUpdatePacket
403+
if err := msg.Decode(&packet); err != nil {
404+
c.logger.Warn().Err(err).Msg("Failed to decode BlockRangeUpdate")
405+
return nil
406+
}
407+
408+
c.countMsgReceived(packet.Name(), 1)
409+
c.logger.Debug().
410+
Uint64("earliest", packet.EarliestBlock).
411+
Uint64("latest", packet.LatestBlock).
412+
Hex("hash", packet.LatestBlockHash[:]).
413+
Msg("Received BlockRangeUpdate")
414+
415+
return nil
416+
}
417+
315418
// getBlockData will send GetBlockHeaders and/or GetBlockBodies requests to the
316419
// peer based on what parts of the block we already have. It will return an error
317420
// if sending either of the requests failed. The isParent parameter indicates if

0 commit comments

Comments
 (0)