Skip to content

Commit 264a2ab

Browse files
committed
feat: optimizations
1 parent 5208f75 commit 264a2ab

3 files changed

Lines changed: 45 additions & 27 deletions

File tree

cmd/p2p/sensor/sensor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type (
5858
ShouldBroadcastTxHashes bool
5959
ShouldBroadcastBlocks bool
6060
ShouldBroadcastBlockHashes bool
61+
BroadcastWorkers int
6162
ShouldRunPprof bool
6263
PprofPort uint
6364
ShouldRunPrometheus bool
@@ -218,6 +219,7 @@ var SensorCmd = &cobra.Command{
218219
ShouldBroadcastTxHashes: inputSensorParams.ShouldBroadcastTxHashes,
219220
ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks,
220221
ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes,
222+
BroadcastWorkers: inputSensorParams.BroadcastWorkers,
221223
})
222224

223225
opts := p2p.EthProtocolOptions{
@@ -491,6 +493,7 @@ will result in less chance of missing data but can significantly increase memory
491493
f.BoolVar(&inputSensorParams.ShouldBroadcastTxHashes, "broadcast-tx-hashes", false, "broadcast transaction hashes to peers")
492494
f.BoolVar(&inputSensorParams.ShouldBroadcastBlocks, "broadcast-blocks", false, "broadcast full blocks to peers")
493495
f.BoolVar(&inputSensorParams.ShouldBroadcastBlockHashes, "broadcast-block-hashes", false, "broadcast block hashes to peers")
496+
f.IntVar(&inputSensorParams.BroadcastWorkers, "broadcast-workers", 4, "number of concurrent broadcast workers")
494497
f.BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "run pprof server")
495498
f.UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "port pprof runs on")
496499
f.BoolVar(&inputSensorParams.ShouldRunPrometheus, "prom", true, "run Prometheus server")

doc/polycli_p2p_sensor.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ polycli p2p sensor amoy-nodes.json \
126126
--broadcast-blocks broadcast full blocks to peers
127127
--broadcast-tx-hashes broadcast transaction hashes to peers
128128
--broadcast-txs broadcast full transactions to peers
129+
--broadcast-workers int number of concurrent broadcast workers (default 4)
129130
--database string which database to persist data to, options are:
130131
- datastore (GCP Datastore)
131132
- json (output to stdout)

p2p/conns.go

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package p2p
22

33
import (
4+
"bytes"
45
"math/big"
56
"sync"
67
"sync/atomic"
@@ -35,6 +36,7 @@ type ConnsOptions struct {
3536
ShouldBroadcastTxHashes bool
3637
ShouldBroadcastBlocks bool
3738
ShouldBroadcastBlockHashes bool
39+
BroadcastWorkers int
3840
}
3941

4042
// Conns manages a collection of active peer connections for transaction broadcasting.
@@ -94,7 +96,15 @@ func NewConns(opts ConnsOptions) *Conns {
9496
shouldBroadcastBlockHashes: opts.ShouldBroadcastBlockHashes,
9597
txBroadcastCh: make(chan types.Transactions, 100000),
9698
}
97-
go c.txBroadcastLoop()
99+
100+
workers := opts.BroadcastWorkers
101+
if workers <= 0 {
102+
workers = 4
103+
}
104+
for i := 0; i < workers; i++ {
105+
go c.txBroadcastLoop()
106+
}
107+
98108
return c
99109
}
100110

@@ -212,44 +222,48 @@ func (c *Conns) BroadcastTxs(txs types.Transactions) int {
212222

213223
// broadcastTxs sends RPC-submitted transactions to all peers via TransactionsMsg.
214224
// Used by txBroadcastLoop to process transactions from EnqueueTxBroadcast.
215-
// Has a timeout to prevent slow peers from blocking the broadcast loop.
216-
func (c *Conns) broadcastTxs(txs types.Transactions, hashes []common.Hash, peers []*conn) int {
225+
func (c *Conns) broadcastTxs(txs types.Transactions, hashes []common.Hash, peers []*conn) {
217226
rawList, err := rlp.EncodeToRawList([]*types.Transaction(txs))
218227
if err != nil {
219228
log.Debug().Err(err).Msg("Failed to encode transactions")
220-
return 0
229+
return
221230
}
222231
packet := &eth.TransactionsPacket{RawList: rawList}
223232

224-
var count atomic.Int32
225-
var wg sync.WaitGroup
233+
// Pre-encode the entire message once to avoid re-encoding for each peer.
234+
encodedMsg, err := rlp.EncodeToBytes(packet)
235+
if err != nil {
236+
log.Debug().Err(err).Msg("Failed to encode message")
237+
return
238+
}
239+
msgSize := uint32(len(encodedMsg))
240+
241+
// Cache loop-invariant values
242+
msgName := packet.Name()
243+
txCount := float64(len(txs))
244+
rebroadcasting := c.shouldBroadcastTx || c.shouldBroadcastTxHashes
226245

227246
for _, peer := range peers {
228-
wg.Go(func() {
229-
peer.countMsgSent(packet.Name(), float64(len(txs)))
230-
if err := ethp2p.Send(peer.rw, eth.TransactionsMsg, packet); err != nil {
247+
go func(peer *conn) {
248+
peer.countMsgSent(msgName, txCount)
249+
250+
// Use WriteMsg directly with pre-encoded bytes instead of Send()
251+
msg := ethp2p.Msg{
252+
Code: eth.TransactionsMsg,
253+
Size: msgSize,
254+
Payload: bytes.NewReader(encodedMsg),
255+
}
256+
if err := peer.rw.WriteMsg(msg); err != nil {
231257
peer.logger.Debug().Err(err).Msg("Failed to send transactions")
232258
return
233259
}
234-
peer.addKnownTxHashes(hashes)
235-
count.Add(1)
236-
})
237-
}
238-
239-
// Wait with timeout
240-
done := make(chan struct{})
241-
go func() {
242-
wg.Wait()
243-
close(done)
244-
}()
245260

246-
select {
247-
case <-done:
248-
case <-time.After(5 * time.Second):
249-
log.Warn().Int("peers", len(peers)).Msg("Broadcast timed out")
261+
// Only track known hashes if rebroadcasting is enabled
262+
if rebroadcasting {
263+
peer.addKnownTxHashes(hashes)
264+
}
265+
}(peer)
250266
}
251-
252-
return int(count.Load())
253267
}
254268

255269
// txBroadcastLoop is the worker that drains the broadcast channel and sends
@@ -297,7 +311,7 @@ func (c *Conns) pullTxBatch() (types.Transactions, []common.Hash) {
297311
}
298312

299313
// Drain more until max size or timeout
300-
timer := time.NewTimer(time.Second)
314+
timer := time.NewTimer(100 * time.Millisecond)
301315
defer timer.Stop()
302316

303317
for batchSize < maxTxPacketSize {

0 commit comments

Comments
 (0)