Skip to content

Commit 5bad693

Browse files
committed
feat: fire and forget
1 parent acaa492 commit 5bad693

3 files changed

Lines changed: 51 additions & 90 deletions

File tree

cmd/p2p/sensor/rpc.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,14 @@ func handleRPC(conns *p2p.Conns, networkID uint64) {
138138

139139
params.counter.WithLabelValues(req.Method, "false").Inc()
140140

141-
// Broadcast any transactions (always broadcast RPC-submitted transactions)
141+
// Broadcast transactions asynchronously to avoid blocking the RPC response
142142
if len(txs) > 0 {
143-
log.Info().Str("hash", txs[0].Hash().Hex()).Msg("Broadcasting transaction")
144-
count := params.conns.BroadcastTxsAlways(txs)
145-
log.Info().Str("hash", txs[0].Hash().Hex()).Int("peers", count).Msg("Transaction broadcast complete")
143+
go func(txs types.Transactions) {
144+
hash := txs[0].Hash().Hex()
145+
log.Info().Str("hash", hash).Msg("Broadcasting transaction")
146+
count := params.conns.BroadcastTxsAlways(txs)
147+
log.Info().Str("hash", hash).Int("peers", count).Msg("Transaction broadcast complete")
148+
}(txs)
146149
}
147150

148151
// Write response
@@ -288,18 +291,13 @@ func handleBatchRequest(w http.ResponseWriter, r *http.Request, body []byte, par
288291
}
289292
}
290293

294+
// Broadcast transactions asynchronously to avoid blocking the RPC response
291295
if len(txs) > 0 {
292-
log.Info().
293-
Int("txs", len(txs)).
294-
Int("requests", len(requests)).
295-
Msg("Broadcasting batch of transactions")
296-
297-
count := params.conns.BroadcastTxsAlways(txs)
298-
299-
log.Info().
300-
Int("txs", len(txs)).
301-
Int("peers", count).
302-
Msg("Batch broadcast complete")
296+
go func(txs types.Transactions) {
297+
log.Info().Int("txs", len(txs)).Msg("Broadcasting batch of transactions")
298+
count := params.conns.BroadcastTxsAlways(txs)
299+
log.Info().Int("txs", len(txs)).Int("peers", count).Msg("Batch broadcast complete")
300+
}(txs)
303301
}
304302

305303
if len(indices) > 0 {

loadtest/preconf.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ func (pt *PreconfTracker) Track(txHash common.Hash) {
102102
var preconfError error
103103
var preconfDuration time.Duration
104104
wg.Go(func() {
105-
106105
preconfStartTime := time.Now()
107106
defer func() {
108107
preconfDuration = time.Since(preconfStartTime)
@@ -116,7 +115,6 @@ func (pt *PreconfTracker) Track(txHash common.Hash) {
116115
var receiptError error
117116
var receiptDuration time.Duration
118117
wg.Go(func() {
119-
120118
time.Sleep(100 * time.Millisecond)
121119

122120
receiptTime := time.Now()

p2p/conns.go

Lines changed: 38 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,10 @@ func (c *Conns) BroadcastTxs(txs types.Transactions) int {
154154
var count atomic.Int32
155155
var wg sync.WaitGroup
156156

157-
for _, cn := range peers {
158-
wg.Add(1)
159-
go func(cn *conn) {
160-
defer wg.Done()
161-
157+
for _, peer := range peers {
158+
wg.Go(func() {
162159
// Filter transactions this peer doesn't know about using batch bloom operation
163-
unknownHashes := cn.filterUnknownTxHashes(hashes)
160+
unknownHashes := peer.filterUnknownTxHashes(hashes)
164161
if len(unknownHashes) == 0 {
165162
return
166163
}
@@ -172,32 +169,25 @@ func (c *Conns) BroadcastTxs(txs types.Transactions) int {
172169
unknownTxs = append(unknownTxs, tx)
173170
}
174171
}
175-
176172
if len(unknownTxs) == 0 {
177173
return
178174
}
179175

180-
// Send as TransactionsPacket
181176
rawList, err := rlp.EncodeToRawList([]*types.Transaction(unknownTxs))
182177
if err != nil {
183-
cn.logger.Debug().
184-
Err(err).
185-
Msg("Failed to encode transactions")
178+
peer.logger.Debug().Err(err).Msg("Failed to encode transactions")
186179
return
187180
}
188181
packet := &eth.TransactionsPacket{RawList: rawList}
189-
cn.countMsgSent(packet.Name(), float64(len(unknownTxs)))
190-
if err := ethp2p.Send(cn.rw, eth.TransactionsMsg, packet); err != nil {
191-
cn.logger.Debug().
192-
Err(err).
193-
Msg("Failed to send transactions")
182+
peer.countMsgSent(packet.Name(), float64(len(unknownTxs)))
183+
if err := ethp2p.Send(peer.rw, eth.TransactionsMsg, packet); err != nil {
184+
peer.logger.Debug().Err(err).Msg("Failed to send transactions")
194185
return
195186
}
196187

197-
// Mark transactions as known for this peer using batch bloom operation
198-
cn.addKnownTxHashes(unknownHashes)
188+
peer.addKnownTxHashes(unknownHashes)
199189
count.Add(1)
200-
}(cn)
190+
})
201191
}
202192

203193
wg.Wait()
@@ -213,17 +203,13 @@ func (c *Conns) BroadcastTxs(txs types.Transactions) int {
213203
return finalCount
214204
}
215205

216-
// BroadcastTxsAlways broadcasts transactions to all connected peers regardless
217-
// of the ShouldBroadcastTx configuration. This is used for transactions
218-
// submitted via RPC that must always be broadcast to the network.
219-
// Unlike BroadcastTxs, this does not filter by known transactions since
220-
// RPC-submitted transactions are always new.
206+
// BroadcastTxsAlways broadcasts RPC-submitted transactions to all peers unconditionally.
207+
// Always uses TransactionsMsg (matching Bor behavior - PooledTransactionsMsg is response-only).
221208
func (c *Conns) BroadcastTxsAlways(txs types.Transactions) int {
222209
if len(txs) == 0 {
223210
return 0
224211
}
225212

226-
// Pre-compute transaction hashes once to avoid redundant Keccak256 computations
227213
hashes := make([]common.Hash, len(txs))
228214
for i, tx := range txs {
229215
hashes[i] = tx.Hash()
@@ -234,51 +220,43 @@ func (c *Conns) BroadcastTxsAlways(txs types.Transactions) int {
234220
return 0
235221
}
236222

237-
// Broadcast concurrently to all peers
223+
return c.broadcastTxs(txs, hashes, peers)
224+
}
225+
226+
// broadcastTxs sends transactions directly via TransactionsMsg.
227+
func (c *Conns) broadcastTxs(txs types.Transactions, hashes []common.Hash, peers []*conn) int {
238228
var count atomic.Int32
239229
var wg sync.WaitGroup
240230

241-
for _, cn := range peers {
242-
wg.Add(1)
243-
go func(cn *conn) {
244-
defer wg.Done()
245-
246-
// Send as TransactionsPacket
231+
for _, peer := range peers {
232+
wg.Go(func() {
247233
rawList, err := rlp.EncodeToRawList([]*types.Transaction(txs))
248234
if err != nil {
249-
cn.logger.Debug().
250-
Err(err).
251-
Msg("Failed to encode transactions")
235+
peer.logger.Debug().Err(err).Msg("Failed to encode transactions")
252236
return
253237
}
254238
packet := &eth.TransactionsPacket{RawList: rawList}
255-
cn.countMsgSent(packet.Name(), float64(len(txs)))
256-
if err := ethp2p.Send(cn.rw, eth.TransactionsMsg, packet); err != nil {
257-
cn.logger.Debug().
258-
Err(err).
259-
Msg("Failed to send transactions")
239+
peer.countMsgSent(packet.Name(), float64(len(txs)))
240+
if err := ethp2p.Send(peer.rw, eth.TransactionsMsg, packet); err != nil {
241+
peer.logger.Debug().Err(err).Msg("Failed to send transactions")
260242
return
261243
}
262-
263-
// Mark transactions as known for this peer using batch bloom operation
264-
cn.addKnownTxHashes(hashes)
244+
peer.addKnownTxHashes(hashes)
265245
count.Add(1)
266-
}(cn)
246+
})
267247
}
268248

269249
wg.Wait()
270250

271-
finalCount := int(count.Load())
272-
if finalCount > 0 {
273-
log.Debug().
274-
Int("peers", finalCount).
275-
Int("txs", len(txs)).
276-
Msg("Broadcasted transactions (always)")
251+
pc := int(count.Load())
252+
if pc > 0 {
253+
log.Debug().Int("peers", pc).Int("txs", len(txs)).Msg("Broadcasted transactions")
277254
}
278255

279-
return finalCount
256+
return pc
280257
}
281258

259+
282260
// BroadcastTxHashes enqueues transaction hashes to per-peer broadcast queues.
283261
// Each peer has a dedicated goroutine that drains the queue and batches sends.
284262
// Returns the number of peers the hashes were enqueued to.
@@ -333,35 +311,22 @@ func (c *Conns) BroadcastBlock(block *types.Block, td *big.Int) int {
333311
var count atomic.Int32
334312
var wg sync.WaitGroup
335313

336-
for _, cn := range peers {
337-
wg.Add(1)
338-
go func(cn *conn) {
339-
defer wg.Done()
340-
341-
// Skip if peer already knows about this block
342-
if cn.hasKnownBlock(hash) {
314+
for _, peer := range peers {
315+
wg.Go(func() {
316+
if peer.hasKnownBlock(hash) {
343317
return
344318
}
345319

346-
// Send NewBlockPacket
347-
packet := eth.NewBlockPacket{
348-
Block: block,
349-
TD: td,
350-
}
351-
352-
cn.countMsgSent(packet.Name(), 1)
353-
if err := ethp2p.Send(cn.rw, eth.NewBlockMsg, &packet); err != nil {
354-
cn.logger.Debug().
355-
Err(err).
356-
Uint64("number", block.Number().Uint64()).
357-
Msg("Failed to send block")
320+
packet := eth.NewBlockPacket{Block: block, TD: td}
321+
peer.countMsgSent(packet.Name(), 1)
322+
if err := ethp2p.Send(peer.rw, eth.NewBlockMsg, &packet); err != nil {
323+
peer.logger.Debug().Err(err).Uint64("number", block.Number().Uint64()).Msg("Failed to send block")
358324
return
359325
}
360326

361-
// Mark block as known for this peer
362-
cn.addKnownBlock(hash)
327+
peer.addKnownBlock(hash)
363328
count.Add(1)
364-
}(cn)
329+
})
365330
}
366331

367332
wg.Wait()

0 commit comments

Comments
 (0)