Skip to content

Commit a5c6985

Browse files
committed
feat: add configurable tx broadcast queue size
1 parent ba880c4 commit a5c6985

File tree

2 files changed

+10
-1
lines changed

2 files changed

+10
-1
lines changed

cmd/p2p/sensor/sensor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type (
5959
ShouldBroadcastBlockHashes bool
6060
BroadcastWorkers int
6161
TxBatchTimeout time.Duration
62+
TxBroadcastQueueSize int
6263
ShouldRunPprof bool
6364
PprofPort uint
6465
ShouldRunPrometheus bool
@@ -221,6 +222,7 @@ var SensorCmd = &cobra.Command{
221222
ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes,
222223
BroadcastWorkers: inputSensorParams.BroadcastWorkers,
223224
TxBatchTimeout: inputSensorParams.TxBatchTimeout,
225+
TxBroadcastQueueSize: inputSensorParams.TxBroadcastQueueSize,
224226
})
225227

226228
opts := p2p.EthProtocolOptions{
@@ -497,6 +499,7 @@ will result in less chance of missing data but can significantly increase memory
497499
f.BoolVar(&inputSensorParams.ShouldBroadcastBlockHashes, "broadcast-block-hashes", false, "broadcast block hashes to peers")
498500
f.IntVar(&inputSensorParams.BroadcastWorkers, "broadcast-workers", 4, "number of concurrent broadcast workers")
499501
f.DurationVar(&inputSensorParams.TxBatchTimeout, "tx-batch-timeout", 500*time.Millisecond, "timeout for batching transactions before broadcast")
502+
f.IntVar(&inputSensorParams.TxBroadcastQueueSize, "tx-broadcast-queue-size", 100000, "capacity of transaction broadcast queue")
500503
f.BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "run pprof server")
501504
f.UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "port pprof runs on")
502505
f.BoolVar(&inputSensorParams.ShouldRunPrometheus, "prom", true, "run Prometheus server")

p2p/conns.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type ConnsOptions struct {
3838
ShouldBroadcastBlockHashes bool
3939
BroadcastWorkers int
4040
TxBatchTimeout time.Duration
41+
TxBroadcastQueueSize int
4142
}
4243

4344
// Conns manages a collection of active peer connections for transaction broadcasting.
@@ -91,6 +92,11 @@ func NewConns(opts ConnsOptions) *Conns {
9192
txBatchTimeout = 500 * time.Millisecond
9293
}
9394

95+
txBroadcastQueueSize := opts.TxBroadcastQueueSize
96+
if txBroadcastQueueSize <= 0 {
97+
txBroadcastQueueSize = 100000
98+
}
99+
94100
c := &Conns{
95101
conns: make(map[string]*conn),
96102
blocks: ds.NewLRU[common.Hash, BlockCache](opts.BlocksCache),
@@ -103,7 +109,7 @@ func NewConns(opts ConnsOptions) *Conns {
103109
shouldBroadcastTxHashes: opts.ShouldBroadcastTxHashes,
104110
shouldBroadcastBlocks: opts.ShouldBroadcastBlocks,
105111
shouldBroadcastBlockHashes: opts.ShouldBroadcastBlockHashes,
106-
txBroadcastCh: make(chan types.Transactions, 100000),
112+
txBroadcastCh: make(chan types.Transactions, txBroadcastQueueSize),
107113
txBatchTimeout: txBatchTimeout,
108114
}
109115

0 commit comments

Comments
 (0)