Skip to content

Commit ada1ccc

Browse files
committed
fix: max out blocks
1 parent 3e1c631 commit ada1ccc

2 files changed

Lines changed: 109 additions & 18 deletions

File tree

cmd/p2p/sensor/rpc.go

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -138,16 +138,9 @@ func handleRPC(conns *p2p.Conns, networkID uint64) {
138138

139139
params.counter.WithLabelValues(req.Method, "false").Inc()
140140

141-
// Broadcast transactions asynchronously to avoid blocking the RPC response
141+
// Enqueue transactions for async broadcast
142142
if len(txs) > 0 {
143-
go func(txs types.Transactions) {
144-
hash := txs[0].Hash().Hex()
145-
count := params.conns.BroadcastTxsAlways(txs)
146-
log.Debug().
147-
Str("hash", hash).
148-
Int("peers", count).
149-
Msg("Broadcasted transaction")
150-
}(txs)
143+
params.conns.EnqueueTxBroadcast(txs)
151144
}
152145

153146
// Write response
@@ -293,15 +286,9 @@ func handleBatchRequest(w http.ResponseWriter, r *http.Request, body []byte, par
293286
}
294287
}
295288

296-
// Broadcast transactions asynchronously to avoid blocking the RPC response
289+
// Enqueue transactions for async broadcast
297290
if len(txs) > 0 {
298-
go func(txs types.Transactions) {
299-
count := params.conns.BroadcastTxsAlways(txs)
300-
log.Debug().
301-
Int("txs", len(txs)).
302-
Int("peers", count).
303-
Msg("Broadcasted transaction batch")
304-
}(txs)
291+
params.conns.EnqueueTxBroadcast(txs)
305292
}
306293

307294
if len(indices) > 0 {

p2p/conns.go

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ type Conns struct {
6767
shouldBroadcastTxHashes bool
6868
shouldBroadcastBlocks bool
6969
shouldBroadcastBlockHashes bool
70+
71+
// txBroadcastQueue is an unbounded queue of transactions for async broadcast
72+
txBroadcastQueue []types.Transactions
73+
txBroadcastMu sync.Mutex
74+
txBroadcastCond *sync.Cond
75+
// closeCh signals the broadcast worker to stop
76+
closeCh chan struct{}
7077
}
7178

7279
// NewConns creates a new connection manager with a blocks cache.
@@ -77,7 +84,7 @@ func NewConns(opts ConnsOptions) *Conns {
7784
oldest := &ds.Locked[*types.Header]{}
7885
oldest.Set(opts.Head.Block.Header())
7986

80-
return &Conns{
87+
c := &Conns{
8188
conns: make(map[string]*conn),
8289
blocks: ds.NewLRU[common.Hash, BlockCache](opts.BlocksCache),
8390
txs: ds.NewLRU[common.Hash, *types.Transaction](opts.TxsCache),
@@ -89,7 +96,11 @@ func NewConns(opts ConnsOptions) *Conns {
8996
shouldBroadcastTxHashes: opts.ShouldBroadcastTxHashes,
9097
shouldBroadcastBlocks: opts.ShouldBroadcastBlocks,
9198
shouldBroadcastBlockHashes: opts.ShouldBroadcastBlockHashes,
99+
closeCh: make(chan struct{}),
92100
}
101+
c.txBroadcastCond = sync.NewCond(&c.txBroadcastMu)
102+
go c.txBroadcastLoop()
103+
return c
93104
}
94105

95106
// Add adds a connection to the manager.
@@ -256,6 +267,99 @@ func (c *Conns) broadcastTxs(txs types.Transactions, hashes []common.Hash, peers
256267
return pc
257268
}
258269

270+
// txBroadcastLoop is the worker that drains the broadcast queue and sends
271+
// transactions to all peers, pulling only up to max packet size per iteration.
272+
// While broadcasting, more transactions accumulate in the queue for the next batch.
273+
func (c *Conns) txBroadcastLoop() {
274+
for {
275+
batch, done := c.pullTxBatch()
276+
if done {
277+
return
278+
}
279+
if len(batch) == 0 {
280+
continue
281+
}
282+
283+
peers := c.snapshotPeers()
284+
if len(peers) == 0 {
285+
continue
286+
}
287+
288+
hashes := make([]common.Hash, len(batch))
289+
for i, tx := range batch {
290+
hashes[i] = tx.Hash()
291+
}
292+
293+
c.broadcastTxs(batch, hashes, peers)
294+
log.Info().Int("txs", len(batch)).Int("peers", len(peers)).Msg("Broadcasted batch")
295+
}
296+
}
297+
298+
// pullTxBatch pulls transactions from the queue up to maxTxPacketSize.
299+
// Returns the batch and whether the worker should exit.
300+
func (c *Conns) pullTxBatch() (types.Transactions, bool) {
301+
c.txBroadcastMu.Lock()
302+
defer c.txBroadcastMu.Unlock()
303+
304+
// Wait for transactions or close signal
305+
for len(c.txBroadcastQueue) == 0 {
306+
select {
307+
case <-c.closeCh:
308+
return nil, true
309+
default:
310+
}
311+
c.txBroadcastCond.Wait()
312+
}
313+
314+
var batch types.Transactions
315+
var batchSize uint64
316+
317+
// Pull from queue until we hit max packet size
318+
for len(c.txBroadcastQueue) > 0 {
319+
txs := c.txBroadcastQueue[0]
320+
321+
// Try to fit transactions from this slice
322+
for len(txs) > 0 {
323+
tx := txs[0]
324+
txSize := tx.Size()
325+
326+
// If adding this tx would exceed max size and we have something, stop
327+
if len(batch) > 0 && batchSize+txSize > maxTxPacketSize {
328+
// Put remaining txs back at front of queue
329+
if len(txs) > 0 {
330+
c.txBroadcastQueue[0] = txs
331+
} else {
332+
c.txBroadcastQueue = c.txBroadcastQueue[1:]
333+
}
334+
return batch, false
335+
}
336+
337+
batch = append(batch, tx)
338+
batchSize += txSize
339+
txs = txs[1:]
340+
}
341+
342+
// Consumed entire slice, remove from queue
343+
c.txBroadcastQueue = c.txBroadcastQueue[1:]
344+
}
345+
346+
return batch, false
347+
}
348+
349+
// EnqueueTxBroadcast adds transactions to the broadcast queue for async sending.
350+
// The queue is unbounded; transactions are never dropped.
351+
func (c *Conns) EnqueueTxBroadcast(txs types.Transactions) {
352+
c.txBroadcastMu.Lock()
353+
c.txBroadcastQueue = append(c.txBroadcastQueue, txs)
354+
c.txBroadcastMu.Unlock()
355+
c.txBroadcastCond.Signal()
356+
}
357+
358+
// Close stops the broadcast worker.
359+
func (c *Conns) Close() {
360+
close(c.closeCh)
361+
c.txBroadcastCond.Broadcast()
362+
}
259363

260364
// BroadcastTxHashes enqueues transaction hashes to per-peer broadcast queues.
261365
// Each peer has a dedicated goroutine that drains the queue and batches sends.

0 commit comments

Comments
 (0)