Skip to content

Commit 1a9ca49

Browse files
minhd-vuclaude
andauthored
feat(sensor): broadcast blocks, txs, and hashes (#727)
- Add transaction and block broadcasting to connected peers - Implement LRU cache and BloomSet data structures for efficient tracking - Add per-peer known tx/block tracking to avoid duplicate sends - Support async transaction hash announcements - Add protocol version negotiation - Fix goroutine leak in broadcast handlers - Add pprof lock profiling support Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent c7646fe commit 1a9ca49

13 files changed

Lines changed: 1558 additions & 404 deletions

File tree

cmd/p2p/sensor/api.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
// (number of p2p messages), along with connection timing information.
1818
type peerData struct {
1919
Name string `json:"name"`
20+
ProtocolVersion uint `json:"protocol_version"`
2021
Received p2p.MessageCount `json:"received"`
2122
Sent p2p.MessageCount `json:"sent"`
2223
PacketsReceived p2p.MessageCount `json:"packets_received"`
@@ -85,6 +86,7 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) {
8586

8687
peers[url] = peerData{
8788
Name: conns.GetPeerName(peerID),
89+
ProtocolVersion: conns.GetPeerVersion(peerID),
8890
Received: messages.Received,
8991
Sent: messages.Sent,
9092
PacketsReceived: messages.PacketsReceived,

cmd/p2p/sensor/sensor.go

Lines changed: 100 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
_ "embed"
77
"errors"
88
"fmt"
9-
"os"
109
"os/signal"
10+
"runtime"
1111
"syscall"
1212
"time"
1313

@@ -29,6 +29,7 @@ import (
2929
"github.com/rs/zerolog/log"
3030
"github.com/spf13/cobra"
3131

32+
ds "github.com/0xPolygon/polygon-cli/p2p/datastructures"
3233
"github.com/0xPolygon/polygon-cli/flag"
3334
"github.com/0xPolygon/polygon-cli/p2p"
3435
"github.com/0xPolygon/polygon-cli/p2p/database"
@@ -52,6 +53,10 @@ type (
5253
ShouldWriteTransactions bool
5354
ShouldWriteTransactionEvents bool
5455
ShouldWritePeers bool
56+
ShouldBroadcastTx bool
57+
ShouldBroadcastTxHashes bool
58+
ShouldBroadcastBlocks bool
59+
ShouldBroadcastBlockHashes bool
5560
ShouldRunPprof bool
5661
PprofPort uint
5762
ShouldRunPrometheus bool
@@ -71,9 +76,12 @@ type (
7176
DiscoveryDNS string
7277
Database string
7378
NoDiscovery bool
74-
RequestsCache p2p.CacheOptions
75-
ParentsCache p2p.CacheOptions
76-
BlocksCache p2p.CacheOptions
79+
RequestsCache ds.LRUOptions
80+
ParentsCache ds.LRUOptions
81+
BlocksCache ds.LRUOptions
82+
TxsCache ds.LRUOptions
83+
KnownTxsBloom ds.BloomSetOptions
84+
KnownBlocksMax int
7785

7886
bootnodes []*enode.Node
7987
staticNodes []*enode.Node
@@ -166,7 +174,10 @@ var SensorCmd = &cobra.Command{
166174
return nil
167175
},
168176
RunE: func(cmd *cobra.Command, args []string) error {
169-
db, err := newDatabase(cmd.Context())
177+
ctx, stop := signal.NotifyContext(cmd.Context(), syscall.SIGINT, syscall.SIGTERM)
178+
defer stop()
179+
180+
db, err := newDatabase(ctx)
170181
if err != nil {
171182
return err
172183
}
@@ -195,21 +206,32 @@ var SensorCmd = &cobra.Command{
195206
// Create peer connection manager for broadcasting transactions
196207
// and managing the global blocks cache
197208
conns := p2p.NewConns(p2p.ConnsOptions{
198-
BlocksCache: inputSensorParams.BlocksCache,
199-
Head: head,
209+
BlocksCache: inputSensorParams.BlocksCache,
210+
TxsCache: inputSensorParams.TxsCache,
211+
KnownTxsBloom: inputSensorParams.KnownTxsBloom,
212+
KnownBlocksMax: inputSensorParams.KnownBlocksMax,
213+
Head: head,
214+
ShouldBroadcastTx: inputSensorParams.ShouldBroadcastTx,
215+
ShouldBroadcastTxHashes: inputSensorParams.ShouldBroadcastTxHashes,
216+
ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks,
217+
ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes,
200218
})
201219

202220
opts := p2p.EthProtocolOptions{
203-
Context: cmd.Context(),
204-
Database: db,
205-
GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
206-
RPC: inputSensorParams.RPC,
207-
SensorID: inputSensorParams.SensorID,
208-
NetworkID: inputSensorParams.NetworkID,
209-
Conns: conns,
210-
ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
211-
RequestsCache: inputSensorParams.RequestsCache,
212-
ParentsCache: inputSensorParams.ParentsCache,
221+
Context: ctx,
222+
Database: db,
223+
GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
224+
RPC: inputSensorParams.RPC,
225+
SensorID: inputSensorParams.SensorID,
226+
NetworkID: inputSensorParams.NetworkID,
227+
Conns: conns,
228+
ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
229+
RequestsCache: inputSensorParams.RequestsCache,
230+
ParentsCache: inputSensorParams.ParentsCache,
231+
ShouldBroadcastTx: inputSensorParams.ShouldBroadcastTx,
232+
ShouldBroadcastTxHashes: inputSensorParams.ShouldBroadcastTxHashes,
233+
ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks,
234+
ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes,
213235
}
214236

215237
config := ethp2p.Config{
@@ -242,20 +264,14 @@ var SensorCmd = &cobra.Command{
242264
if err = server.Start(); err != nil {
243265
return err
244266
}
245-
defer server.Stop()
267+
defer stopServer(&server)
246268

247269
events := make(chan *ethp2p.PeerEvent)
248270
sub := server.SubscribeEvents(events)
249271
defer sub.Unsubscribe()
250272

251-
ticker := time.NewTicker(2 * time.Second) // Ticker for recurring tasks every 2 seconds.
252-
ticker1h := time.NewTicker(time.Hour) // Ticker for running DNS discovery every hour.
273+
ticker := time.NewTicker(2 * time.Second)
253274
defer ticker.Stop()
254-
defer ticker1h.Stop()
255-
256-
dnsLock := make(chan struct{}, 1)
257-
signals := make(chan os.Signal, 1)
258-
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
259275

260276
if inputSensorParams.ShouldRunPprof {
261277
go handlePprof()
@@ -266,34 +282,17 @@ var SensorCmd = &cobra.Command{
266282
}
267283

268284
go handleAPI(&server, conns)
269-
270-
// Start the RPC server for receiving transactions
271285
go handleRPC(conns, inputSensorParams.NetworkID)
272-
273-
// Run DNS discovery immediately at startup.
274-
go handleDNSDiscovery(&server, dnsLock)
286+
go handleDNSDiscovery(&server)
275287

276288
for {
277289
select {
278290
case <-ticker.C:
279291
peersGauge.Set(float64(server.PeerCount()))
280-
db.WritePeers(cmd.Context(), server.Peers(), time.Now())
281-
292+
db.WritePeers(ctx, server.Peers(), time.Now())
282293
metrics.Update(conns.HeadBlock().Block, conns.OldestBlock())
283-
284-
urls := []string{}
285-
for _, peer := range server.Peers() {
286-
urls = append(urls, peer.Node().URLv4())
287-
}
288-
289-
if err := p2p.WritePeers(inputSensorParams.NodesFile, urls); err != nil {
290-
log.Error().Err(err).Msg("Failed to write nodes to file")
291-
}
292-
case <-ticker1h.C:
293-
go handleDNSDiscovery(&server, dnsLock)
294-
case <-signals:
295-
// This gracefully stops the sensor so that the peers can be written to
296-
// the nodes file.
294+
writePeers(server.Peers())
295+
case <-ctx.Done():
297296
log.Info().Msg("Stopping sensor...")
298297
return nil
299298
case event := <-events:
@@ -305,11 +304,43 @@ var SensorCmd = &cobra.Command{
305304
},
306305
}
307306

307+
// writePeers writes the enode URLs of connected peers to the nodes file.
308+
func writePeers(peers []*ethp2p.Peer) {
309+
urls := make([]string, 0, len(peers))
310+
for _, peer := range peers {
311+
urls = append(urls, peer.Node().URLv4())
312+
}
313+
314+
if err := p2p.WritePeers(inputSensorParams.NodesFile, urls); err != nil {
315+
log.Error().Err(err).Msg("Failed to write nodes to file")
316+
}
317+
}
318+
319+
// stopServer stops the p2p server with a timeout to avoid hanging on shutdown.
320+
// This is necessary because go-ethereum's discovery shutdown can deadlock.
321+
func stopServer(server *ethp2p.Server) {
322+
done := make(chan struct{})
323+
324+
go func() {
325+
server.Stop()
326+
close(done)
327+
}()
328+
329+
select {
330+
case <-done:
331+
case <-time.After(5 * time.Second):
332+
}
333+
}
334+
308335
// handlePprof starts a server for performance profiling using pprof on the
309336
// specified port. This allows for real-time monitoring and analysis of the
310337
// sensor's performance. The port number is configured through
311338
// inputSensorParams.PprofPort. An error is logged if the server fails to start.
312339
func handlePprof() {
340+
// Enable mutex and block profiling to detect lock contention.
341+
runtime.SetMutexProfileFraction(1)
342+
runtime.SetBlockProfileRate(1)
343+
313344
addr := fmt.Sprintf(":%d", inputSensorParams.PprofPort)
314345
if err := http.ListenAndServe(addr, nil); err != nil {
315346
log.Error().Err(err).Msg("Failed to start pprof")
@@ -331,20 +362,24 @@ func handlePrometheus() {
331362

332363
// handleDNSDiscovery performs DNS-based peer discovery and adds new peers to
333364
// the p2p server. It uses an iterator to discover peers incrementally rather
334-
// than loading all nodes at once. The lock channel prevents concurrent runs.
335-
func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) {
365+
// than loading all nodes at once. Runs immediately and then hourly.
366+
func handleDNSDiscovery(server *ethp2p.Server) {
336367
if len(inputSensorParams.DiscoveryDNS) == 0 {
337368
return
338369
}
339370

340-
select {
341-
case lock <- struct{}{}:
342-
defer func() { <-lock }()
343-
default:
344-
log.Warn().Msg("DNS discovery already running, skipping")
345-
return
371+
discoverPeers(server)
372+
373+
ticker := time.NewTicker(time.Hour)
374+
defer ticker.Stop()
375+
376+
for range ticker.C {
377+
discoverPeers(server)
346378
}
379+
}
347380

381+
// discoverPeers performs a single DNS discovery round.
382+
func discoverPeers(server *ethp2p.Server) {
348383
log.Info().
349384
Str("discovery-dns", inputSensorParams.DiscoveryDNS).
350385
Msg("Starting DNS discovery")
@@ -357,17 +392,13 @@ func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) {
357392
}
358393
defer iter.Close()
359394

360-
// Add DNS-discovered peers using the iterator.
361395
count := 0
362396
for iter.Next() {
363397
node := iter.Node()
364398
log.Debug().
365399
Str("enode", node.URLv4()).
366400
Msg("Discovered peer through DNS")
367401

368-
// Add the peer to the static node set. The server itself handles whether to
369-
// connect to the peer if it's already connected. If a node is part of the
370-
// static peer set, the server will handle reconnecting after disconnects.
371402
server.AddPeer(node)
372403
count++
373404
}
@@ -450,6 +481,10 @@ will result in less chance of missing data but can significantly increase memory
450481
f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true,
451482
`write transaction events to database (this option can significantly increase CPU and memory usage)`)
452483
f.BoolVar(&inputSensorParams.ShouldWritePeers, "write-peers", true, "write peers to database")
484+
f.BoolVar(&inputSensorParams.ShouldBroadcastTx, "broadcast-txs", false, "broadcast full transactions to peers")
485+
f.BoolVar(&inputSensorParams.ShouldBroadcastTxHashes, "broadcast-tx-hashes", false, "broadcast transaction hashes to peers")
486+
f.BoolVar(&inputSensorParams.ShouldBroadcastBlocks, "broadcast-blocks", false, "broadcast full blocks to peers")
487+
f.BoolVar(&inputSensorParams.ShouldBroadcastBlockHashes, "broadcast-block-hashes", false, "broadcast block hashes to peers")
453488
f.BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "run pprof server")
454489
f.UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "port pprof runs on")
455490
f.BoolVar(&inputSensorParams.ShouldRunPrometheus, "prom", true, "run Prometheus server")
@@ -483,4 +518,11 @@ will result in less chance of missing data but can significantly increase memory
483518
f.DurationVar(&inputSensorParams.ParentsCache.TTL, "parents-cache-ttl", 5*time.Minute, "time to live for parent hash cache entries (0 for no expiration)")
484519
f.IntVar(&inputSensorParams.BlocksCache.MaxSize, "max-blocks", 1024, "maximum blocks to track across all peers (0 for no limit)")
485520
f.DurationVar(&inputSensorParams.BlocksCache.TTL, "blocks-cache-ttl", 10*time.Minute, "time to live for block cache entries (0 for no expiration)")
521+
f.IntVar(&inputSensorParams.TxsCache.MaxSize, "max-txs", 32768, "maximum transactions to cache for serving to peers (0 for no limit)")
522+
f.DurationVar(&inputSensorParams.TxsCache.TTL, "txs-cache-ttl", 10*time.Minute, "time to live for transaction cache entries (0 for no expiration)")
523+
f.UintVar(&inputSensorParams.KnownTxsBloom.Size, "known-txs-bloom-size", 327680,
524+
`bloom filter size in bits for tracking known transactions per peer (default ~40KB per filter,
525+
optimized for ~32K elements with ~1% false positive rate)`)
526+
f.UintVar(&inputSensorParams.KnownTxsBloom.HashCount, "known-txs-bloom-hashes", 7, "number of hash functions for known txs bloom filter")
527+
f.IntVar(&inputSensorParams.KnownBlocksMax, "max-known-blocks", 1024, "maximum block hashes to track per peer (0 for no limit)")
486528
}

doc/polycli_p2p_sensor.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ polycli p2p sensor amoy-nodes.json \
9494
--api-port uint port API server will listen on (default 8080)
9595
--blocks-cache-ttl duration time to live for block cache entries (0 for no expiration) (default 10m0s)
9696
-b, --bootnodes string comma separated nodes used for bootstrapping
97+
--broadcast-block-hashes broadcast block hashes to peers
98+
--broadcast-blocks broadcast full blocks to peers
99+
--broadcast-tx-hashes broadcast transaction hashes to peers
100+
--broadcast-txs broadcast full transactions to peers
97101
--database string which database to persist data to, options are:
98102
- datastore (GCP Datastore)
99103
- json (output to stdout)
@@ -107,12 +111,17 @@ polycli p2p sensor amoy-nodes.json \
107111
-h, --help help for sensor
108112
--key string hex-encoded private key (cannot be set with --key-file)
109113
-k, --key-file string private key file (cannot be set with --key)
114+
--known-txs-bloom-hashes uint number of hash functions for known txs bloom filter (default 7)
115+
--known-txs-bloom-size uint bloom filter size in bits for tracking known transactions per peer (default ~40KB per filter,
116+
optimized for ~32K elements with ~1% false positive rate) (default 327680)
110117
--max-blocks int maximum blocks to track across all peers (0 for no limit) (default 1024)
111118
-D, --max-db-concurrency int maximum number of concurrent database operations to perform (increasing this
112119
will result in less chance of missing data but can significantly increase memory usage) (default 10000)
120+
--max-known-blocks int maximum block hashes to track per peer (0 for no limit) (default 1024)
113121
--max-parents int maximum parent block hashes to track per peer (0 for no limit) (default 1024)
114122
-m, --max-peers int maximum number of peers to connect to (default 2000)
115123
--max-requests int maximum request IDs to track per peer (0 for no limit) (default 2048)
124+
--max-txs int maximum transactions to cache for serving to peers (0 for no limit) (default 32768)
116125
--nat string NAT port mapping mechanism (any|none|upnp|pmp|pmp:<IP>|extip:<IP>) (default "any")
117126
-n, --network-id uint filter discovered nodes by this network ID
118127
--no-discovery disable P2P peer discovery
@@ -130,6 +139,7 @@ polycli p2p sensor amoy-nodes.json \
130139
--static-nodes string static nodes file
131140
--trusted-nodes string trusted nodes file
132141
--ttl duration time to live (default 336h0m0s)
142+
--txs-cache-ttl duration time to live for transaction cache entries (0 for no expiration) (default 10m0s)
133143
--write-block-events write block events to database (default true)
134144
-B, --write-blocks write blocks to database (default true)
135145
--write-peers write peers to database (default true)

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ require (
100100
github.com/cockroachdb/redact v1.1.5 // indirect
101101
github.com/consensys/gnark-crypto v0.19.2 // indirect
102102
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
103-
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
103+
github.com/deckarep/golang-set/v2 v2.6.0
104104
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
105105
github.com/fsnotify/fsnotify v1.9.0 // indirect
106106
github.com/getsentry/sentry-go v0.29.1 // indirect
@@ -114,7 +114,7 @@ require (
114114
github.com/googleapis/enterprise-certificate-proxy v0.3.12 // indirect
115115
github.com/googleapis/gax-go/v2 v2.17.0 // indirect
116116
github.com/gorilla/websocket v1.5.3 // indirect
117-
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
117+
github.com/holiman/bloomfilter/v2 v2.0.3
118118
github.com/holiman/uint256 v1.3.2
119119
github.com/huin/goupnp v1.3.0 // indirect
120120
github.com/inconshreveable/mousetrap v1.1.0 // indirect

0 commit comments

Comments
 (0)