Skip to content

Commit 0cdb156

Browse files
committed
feat: add configurable tx batch timeout with 500ms default
1 parent 6bb4988 commit 0cdb156

2 files changed

Lines changed: 14 additions & 1 deletion

File tree

cmd/p2p/sensor/sensor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type (
5858
ShouldBroadcastBlocks bool
5959
ShouldBroadcastBlockHashes bool
6060
BroadcastWorkers int
61+
TxBatchTimeout time.Duration
6162
ShouldRunPprof bool
6263
PprofPort uint
6364
ShouldRunPrometheus bool
@@ -219,6 +220,7 @@ var SensorCmd = &cobra.Command{
219220
ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks,
220221
ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes,
221222
BroadcastWorkers: inputSensorParams.BroadcastWorkers,
223+
TxBatchTimeout: inputSensorParams.TxBatchTimeout,
222224
})
223225

224226
opts := p2p.EthProtocolOptions{
@@ -494,6 +496,7 @@ will result in less chance of missing data but can significantly increase memory
494496
f.BoolVar(&inputSensorParams.ShouldBroadcastBlocks, "broadcast-blocks", false, "broadcast full blocks to peers")
495497
f.BoolVar(&inputSensorParams.ShouldBroadcastBlockHashes, "broadcast-block-hashes", false, "broadcast block hashes to peers")
496498
f.IntVar(&inputSensorParams.BroadcastWorkers, "broadcast-workers", 4, "number of concurrent broadcast workers")
499+
f.DurationVar(&inputSensorParams.TxBatchTimeout, "tx-batch-timeout", 500*time.Millisecond, "timeout for batching transactions before broadcast")
497500
f.BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "run pprof server")
498501
f.UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "port pprof runs on")
499502
f.BoolVar(&inputSensorParams.ShouldRunPrometheus, "prom", true, "run Prometheus server")

p2p/conns.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type ConnsOptions struct {
3737
ShouldBroadcastBlocks bool
3838
ShouldBroadcastBlockHashes bool
3939
BroadcastWorkers int
40+
TxBatchTimeout time.Duration
4041
}
4142

4243
// Conns manages a collection of active peer connections for transaction broadcasting.
@@ -72,6 +73,9 @@ type Conns struct {
7273

7374
// txBroadcastCh is a buffered channel for async transaction broadcast
7475
txBroadcastCh chan types.Transactions
76+
77+
// txBatchTimeout is the timeout for batching transactions before broadcast
78+
txBatchTimeout time.Duration
7579
}
7680

7781
// NewConns creates a new connection manager with a blocks cache.
@@ -82,6 +86,11 @@ func NewConns(opts ConnsOptions) *Conns {
8286
oldest := &ds.Locked[*types.Header]{}
8387
oldest.Set(opts.Head.Block.Header())
8488

89+
txBatchTimeout := opts.TxBatchTimeout
90+
if txBatchTimeout <= 0 {
91+
txBatchTimeout = 500 * time.Millisecond
92+
}
93+
8594
c := &Conns{
8695
conns: make(map[string]*conn),
8796
blocks: ds.NewLRU[common.Hash, BlockCache](opts.BlocksCache),
@@ -95,6 +104,7 @@ func NewConns(opts ConnsOptions) *Conns {
95104
shouldBroadcastBlocks: opts.ShouldBroadcastBlocks,
96105
shouldBroadcastBlockHashes: opts.ShouldBroadcastBlockHashes,
97106
txBroadcastCh: make(chan types.Transactions, 100000),
107+
txBatchTimeout: txBatchTimeout,
98108
}
99109

100110
workers := opts.BroadcastWorkers
@@ -311,7 +321,7 @@ func (c *Conns) pullTxBatch() (types.Transactions, []common.Hash) {
311321
}
312322

313323
// Drain more until max size or timeout
314-
timer := time.NewTimer(100 * time.Millisecond)
324+
timer := time.NewTimer(c.txBatchTimeout)
315325
defer timer.Stop()
316326

317327
for batchSize < maxTxPacketSize {

0 commit comments

Comments
 (0)