@@ -69,11 +69,10 @@ type Conns struct {
6969 shouldBroadcastBlockHashes bool
7070
7171 // txBroadcastQueue is an unbounded queue of transactions for async broadcast
72- txBroadcastQueue []types.Transactions
73- txBroadcastMu sync.Mutex
74- txBroadcastCond * sync.Cond
75- // closeCh signals the broadcast worker to stop
76- closeCh chan struct {}
72+ txBroadcastQueue []types.Transactions
73+ txBroadcastMu sync.Mutex
74+ txBroadcastCond * sync.Cond
75+ txBroadcastClosed bool
7776}
7877
7978// NewConns creates a new connection manager with a blocks cache.
@@ -96,7 +95,6 @@ func NewConns(opts ConnsOptions) *Conns {
9695 shouldBroadcastTxHashes : opts .ShouldBroadcastTxHashes ,
9796 shouldBroadcastBlocks : opts .ShouldBroadcastBlocks ,
9897 shouldBroadcastBlockHashes : opts .ShouldBroadcastBlockHashes ,
99- closeCh : make (chan struct {}),
10098 }
10199 c .txBroadcastCond = sync .NewCond (& c .txBroadcastMu )
102100 go c .txBroadcastLoop ()
@@ -272,13 +270,10 @@ func (c *Conns) broadcastTxs(txs types.Transactions, hashes []common.Hash, peers
272270// While broadcasting, more transactions accumulate in the queue for the next batch.
273271func (c * Conns ) txBroadcastLoop () {
274272 for {
275- batch , done := c .pullTxBatch ()
276- if done {
273+ batch := c .pullTxBatch ()
274+ if batch == nil {
277275 return
278276 }
279- if len (batch ) == 0 {
280- continue
281- }
282277
283278 peers := c .snapshotPeers ()
284279 if len (peers ) == 0 {
@@ -291,25 +286,22 @@ func (c *Conns) txBroadcastLoop() {
291286 }
292287
293288 c .broadcastTxs (batch , hashes , peers )
294- log .Info ().Int ("txs" , len (batch )).Int ("peers" , len (peers )).Msg ("Broadcasted batch" )
295289 }
296290}
297291
298292// pullTxBatch pulls transactions from the queue up to maxTxPacketSize.
299- // Returns the batch and whether the worker should exit.
300- func (c * Conns ) pullTxBatch () ( types.Transactions , bool ) {
293+ // Returns nil when the worker should exit.
294+ func (c * Conns ) pullTxBatch () types.Transactions {
301295 c .txBroadcastMu .Lock ()
302296 defer c .txBroadcastMu .Unlock ()
303297
304298 // Wait for transactions or close signal
305- for len (c .txBroadcastQueue ) == 0 {
306- select {
307- case <- c .closeCh :
308- return nil , true
309- default :
310- }
299+ for len (c .txBroadcastQueue ) == 0 && ! c .txBroadcastClosed {
311300 c .txBroadcastCond .Wait ()
312301 }
302+ if c .txBroadcastClosed {
303+ return nil
304+ }
313305
314306 var batch types.Transactions
315307 var batchSize uint64
@@ -325,13 +317,8 @@ func (c *Conns) pullTxBatch() (types.Transactions, bool) {
325317
326318 // If adding this tx would exceed max size and we have something, stop
327319 if len (batch ) > 0 && batchSize + txSize > maxTxPacketSize {
328- // Put remaining txs back at front of queue
329- if len (txs ) > 0 {
330- c .txBroadcastQueue [0 ] = txs
331- } else {
332- c .txBroadcastQueue = c .txBroadcastQueue [1 :]
333- }
334- return batch , false
320+ c .txBroadcastQueue [0 ] = txs
321+ return batch
335322 }
336323
337324 batch = append (batch , tx )
@@ -343,7 +330,7 @@ func (c *Conns) pullTxBatch() (types.Transactions, bool) {
343330 c .txBroadcastQueue = c .txBroadcastQueue [1 :]
344331 }
345332
346- return batch , false
333+ return batch
347334}
348335
349336// EnqueueTxBroadcast adds transactions to the broadcast queue for async sending.
@@ -357,7 +344,9 @@ func (c *Conns) EnqueueTxBroadcast(txs types.Transactions) {
357344
358345// Close stops the broadcast worker.
359346func (c * Conns ) Close () {
360- close (c .closeCh )
347+ c .txBroadcastMu .Lock ()
348+ c .txBroadcastClosed = true
349+ c .txBroadcastMu .Unlock ()
361350 c .txBroadcastCond .Broadcast ()
362351}
363352
0 commit comments