From 87a5e2640deda43ac86eeffb09c7762731b40c3b Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Mon, 4 May 2026 16:07:21 -0400 Subject: [PATCH 1/7] fix: improve p2p sensor transaction writing consistency - Change handleNewPooledTransactionHashes condition from || to && so transactions are requested when either --write-txs or --write-tx-events is enabled (previously required both) - Preserve earliest TimeFirstSeen for transactions by checking existing records before writing, matching block behavior - Skip writing transactions that already exist with earlier timestamp - Add --write-first-tx-event flag for write-first-event-only behavior - Add SensorFirstSeen field to DatastoreTransaction for sensor attribution --- cmd/p2p/sensor/sensor.go | 34 ++++---- doc/polycli_p2p_sensor.md | 1 + p2p/database/database.go | 1 + p2p/database/datastore.go | 169 ++++++++++++++++++++++---------------- p2p/database/json.go | 5 ++ p2p/database/nodb.go | 5 ++ p2p/protocol.go | 2 +- 7 files changed, 130 insertions(+), 87 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 957dd09ed..02accd81f 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -48,9 +48,10 @@ type ( ShouldWriteBlocks bool ShouldWriteBlockEvents bool ShouldWriteFirstBlockEvent bool - ShouldWriteTransactions bool - ShouldWriteTransactionEvents bool - ShouldWritePeers bool + ShouldWriteTransactions bool + ShouldWriteTransactionEvents bool + ShouldWriteFirstTransactionEvent bool + ShouldWritePeers bool ShouldBroadcastTx bool ShouldBroadcastTxHashes bool ShouldBroadcastBlocks bool @@ -437,18 +438,19 @@ func newDatabase(ctx context.Context) (database.Database, error) { switch inputSensorParams.Database { case "datastore": return database.NewDatastore(ctx, database.DatastoreOptions{ - ProjectID: inputSensorParams.ProjectID, - DatabaseID: inputSensorParams.DatabaseID, - SensorID: inputSensorParams.SensorID, - ChainID: inputSensorParams.NetworkID, - MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency, - ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks, - ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents, - ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent, - ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions, - ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents, - ShouldWritePeers: inputSensorParams.ShouldWritePeers, - TTL: inputSensorParams.TTL, + ProjectID: inputSensorParams.ProjectID, + DatabaseID: inputSensorParams.DatabaseID, + SensorID: inputSensorParams.SensorID, + ChainID: inputSensorParams.NetworkID, + MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency, + ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks, + ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents, + ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent, + ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions, + ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents, + ShouldWriteFirstTransactionEvent: inputSensorParams.ShouldWriteFirstTransactionEvent, + ShouldWritePeers: inputSensorParams.ShouldWritePeers, + TTL: inputSensorParams.TTL, }), nil case "json": return database.NewJSONDatabase(database.JSONDatabaseOptions{ @@ -489,6 +491,8 @@ will result in less chance of missing data but can significantly increase memory `write transactions to database (this option can significantly increase CPU and memory usage)`) f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true, `write transaction events to database (this option can significantly increase CPU and memory usage)`) + f.BoolVar(&inputSensorParams.ShouldWriteFirstTransactionEvent, "write-first-tx-event", false, + "write one transaction event on first-seen only (requires --write-tx-events=false)") f.BoolVar(&inputSensorParams.ShouldWritePeers, "write-peers", true, "write peers to database") f.BoolVar(&inputSensorParams.ShouldBroadcastTx, "broadcast-txs", false, "broadcast full transactions to peers") f.BoolVar(&inputSensorParams.ShouldBroadcastTxHashes, "broadcast-tx-hashes", false, "broadcast transaction hashes to peers") diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index e08106ac3..e88d6cae0 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -178,6 +178,7 @@ polycli p2p sensor amoy-nodes.json \ --write-block-events write block events to database (default true) -B, --write-blocks write blocks to database (default true) --write-first-block-event write one block event on first-seen only (requires --write-block-events=false) + --write-first-tx-event write one transaction event on first-seen only (requires --write-tx-events=false) --write-peers write peers to database (default true) --write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true) -t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true) diff --git a/p2p/database/database.go b/p2p/database/database.go index dc1cd72d6..d31e7cdb0 100644 --- a/p2p/database/database.go +++ b/p2p/database/database.go @@ -54,6 +54,7 @@ type Database interface { ShouldWriteFirstBlockEvent() bool ShouldWriteTransactions() bool ShouldWriteTransactionEvents() bool + ShouldWriteFirstTransactionEvent() bool ShouldWritePeers() bool // NodeList will return a list of enode URLs. diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 41857d943..82ef22ecb 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -29,18 +29,19 @@ const ( // Datastore wraps the datastore client, stores the sensorID, and other // information needed when writing blocks and transactions. type Datastore struct { - client *datastore.Client - sensorID string - chainID *big.Int - maxConcurrency int - shouldWriteBlocks bool - shouldWriteBlockEvents bool - shouldWriteFirstBlockEvent bool - shouldWriteTransactions bool - shouldWriteTransactionEvents bool - shouldWritePeers bool - jobs chan struct{} - ttl time.Duration + client *datastore.Client + sensorID string + chainID *big.Int + maxConcurrency int + shouldWriteBlocks bool + shouldWriteBlockEvents bool + shouldWriteFirstBlockEvent bool + shouldWriteTransactions bool + shouldWriteTransactionEvents bool + shouldWriteFirstTransactionEvent bool + shouldWritePeers bool + jobs chan struct{} + ttl time.Duration } // DatastoreEvent can represent a peer sending the sensor a transaction hash or @@ -94,20 +95,21 @@ type DatastoreBlock struct { // not indexed because there is a max sized for indexed byte slices, which Data // will occasionally exceed. type DatastoreTransaction struct { - Data []byte `datastore:",noindex"` - From string - Gas string - GasFeeCap string - GasPrice string - GasTipCap string - Nonce string - To string - Value string - V, R, S string `datastore:",noindex"` - Time time.Time - TimeFirstSeen time.Time - TTL time.Time - Type int16 + Data []byte `datastore:",noindex"` + From string + Gas string + GasFeeCap string + GasPrice string + GasTipCap string + Nonce string + To string + Value string + V, R, S string `datastore:",noindex"` + Time time.Time + TimeFirstSeen time.Time + TTL time.Time + Type int16 + SensorFirstSeen string } type DatastorePeer struct { @@ -121,18 +123,19 @@ type DatastorePeer struct { // DatastoreOptions is used when creating a NewDatastore. type DatastoreOptions struct { - ProjectID string - DatabaseID string - SensorID string - ChainID uint64 - MaxConcurrency int - ShouldWriteBlocks bool - ShouldWriteBlockEvents bool - ShouldWriteFirstBlockEvent bool - ShouldWriteTransactions bool - ShouldWriteTransactionEvents bool - ShouldWritePeers bool - TTL time.Duration + ProjectID string + DatabaseID string + SensorID string + ChainID uint64 + MaxConcurrency int + ShouldWriteBlocks bool + ShouldWriteBlockEvents bool + ShouldWriteFirstBlockEvent bool + ShouldWriteTransactions bool + ShouldWriteTransactionEvents bool + ShouldWriteFirstTransactionEvent bool + ShouldWritePeers bool + TTL time.Duration } // NewDatastore connects to datastore and creates the client. This should @@ -144,18 +147,19 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database { } return &Datastore{ - client: client, - sensorID: opts.SensorID, - chainID: new(big.Int).SetUint64(opts.ChainID), - maxConcurrency: opts.MaxConcurrency, - shouldWriteBlocks: opts.ShouldWriteBlocks, - shouldWriteBlockEvents: opts.ShouldWriteBlockEvents, - shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent, - shouldWriteTransactions: opts.ShouldWriteTransactions, - shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents, - shouldWritePeers: opts.ShouldWritePeers, - jobs: make(chan struct{}, opts.MaxConcurrency), - ttl: opts.TTL, + client: client, + sensorID: opts.SensorID, + chainID: new(big.Int).SetUint64(opts.ChainID), + maxConcurrency: opts.MaxConcurrency, + shouldWriteBlocks: opts.ShouldWriteBlocks, + shouldWriteBlockEvents: opts.ShouldWriteBlockEvents, + shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent, + shouldWriteTransactions: opts.ShouldWriteTransactions, + shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents, + shouldWriteFirstTransactionEvent: opts.ShouldWriteFirstTransactionEvent, + shouldWritePeers: opts.ShouldWritePeers, + jobs: make(chan struct{}, opts.MaxConcurrency), + ttl: opts.TTL, } } @@ -432,22 +436,23 @@ func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, tfs time.Time } return &DatastoreTransaction{ - Data: tx.Data(), - From: from, - Gas: fmt.Sprint(tx.Gas()), - GasFeeCap: tx.GasFeeCap().String(), - GasPrice: tx.GasPrice().String(), - GasTipCap: tx.GasTipCap().String(), - Nonce: fmt.Sprint(tx.Nonce()), - To: to, - Value: tx.Value().String(), - V: v.String(), - R: r.String(), - S: s.String(), - Time: tx.Time(), - TimeFirstSeen: tfs, - TTL: tfs.Add(d.ttl), - Type: int16(tx.Type()), + Data: tx.Data(), + From: from, + Gas: fmt.Sprint(tx.Gas()), + GasFeeCap: tx.GasFeeCap().String(), + GasPrice: tx.GasPrice().String(), + GasTipCap: tx.GasTipCap().String(), + Nonce: fmt.Sprint(tx.Nonce()), + To: to, + Value: tx.Value().String(), + V: v.String(), + R: r.String(), + S: s.String(), + Time: tx.Time(), + TimeFirstSeen: tfs, + TTL: tfs.Add(d.ttl), + Type: int16(tx.Type()), + SensorFirstSeen: d.sensorID, } } @@ -639,18 +644,36 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has } } -// writeTransactions will write the transactions to datastore and return the -// transaction hashes. +// writeTransactions will write the transactions to datastore, skipping +// transactions that already exist with an earlier or equal TimeFirstSeen. func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transaction, tfs time.Time) { keys := make([]*datastore.Key, 0, len(txs)) - transactions := make([]*DatastoreTransaction, 0, len(txs)) - for _, tx := range txs { keys = append(keys, datastore.NameKey(TransactionsKind, tx.Hash().Hex(), nil)) + } + + // Fetch existing transactions to check TimeFirstSeen + existing := make([]*DatastoreTransaction, len(keys)) + _ = d.client.GetMulti(ctx, keys, existing) // Ignore errors - missing keys return nil + + transactions := make([]*DatastoreTransaction, 0, len(txs)) + keysToWrite := make([]*datastore.Key, 0, len(txs)) + + for i, tx := range txs { + // Skip if existing record has an earlier or equal TimeFirstSeen + if existing[i] != nil && !existing[i].TimeFirstSeen.IsZero() && !tfs.Before(existing[i].TimeFirstSeen) { + continue + } + + keysToWrite = append(keysToWrite, keys[i]) transactions = append(transactions, d.newDatastoreTransaction(tx, tfs)) } - if _, err := d.client.PutMulti(ctx, keys, transactions); err != nil { + if len(keysToWrite) == 0 { + return + } + + if _, err := d.client.PutMulti(ctx, keysToWrite, transactions); err != nil { log.Error().Err(err).Msg("Failed to write transactions") } } @@ -659,6 +682,10 @@ func (d *Datastore) ShouldWriteFirstBlockEvent() bool { return d.shouldWriteFirstBlockEvent } +func (d *Datastore) ShouldWriteFirstTransactionEvent() bool { + return d.shouldWriteFirstTransactionEvent +} + func (d *Datastore) NodeList(ctx context.Context, limit int) ([]string, error) { query := datastore.NewQuery(BlockEventsKind).Order("-Time") iter := d.client.Run(ctx, query) diff --git a/p2p/database/json.go b/p2p/database/json.go index d1a9ab0fc..633ce0c79 100644 --- a/p2p/database/json.go +++ b/p2p/database/json.go @@ -384,6 +384,11 @@ func (j *JSONDatabase) ShouldWriteTransactions() bool { return j.shouldWriteTransactions } +// ShouldWriteFirstTransactionEvent returns false for JSON database. +func (j *JSONDatabase) ShouldWriteFirstTransactionEvent() bool { + return false +} + // ShouldWriteTransactionEvents returns the configured value. func (j *JSONDatabase) ShouldWriteTransactionEvents() bool { return j.shouldWriteTransactionEvents diff --git a/p2p/database/nodb.go b/p2p/database/nodb.go index 1bc003980..6a5337968 100644 --- a/p2p/database/nodb.go +++ b/p2p/database/nodb.go @@ -84,6 +84,11 @@ func (n *nodb) ShouldWriteTransactionEvents() bool { return false } +// ShouldWriteFirstTransactionEvent returns false. +func (n *nodb) ShouldWriteFirstTransactionEvent() bool { + return false +} + // ShouldWritePeers returns false. func (n *nodb) ShouldWritePeers() bool { return false diff --git a/p2p/protocol.go b/p2p/protocol.go index 21fc779b3..de2b7a395 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -1234,7 +1234,7 @@ func (c *conn) handleNewPooledTransactionHashes(version uint, msg ethp2p.Msg) er c.countMsgReceived(name, float64(len(hashes))) - if !c.db.ShouldWriteTransactions() || !c.db.ShouldWriteTransactionEvents() { + if !c.db.ShouldWriteTransactions() && !c.db.ShouldWriteTransactionEvents() { return nil } From 65b46e5d9fa9ea8adc3ce49e3d8453d4e2006f86 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Mon, 4 May 2026 20:42:22 -0400 Subject: [PATCH 2/7] feat: add worker pool for transaction writes and cache-first deduplication - Add fixed worker pool (default 100 workers) for transaction and transaction event writes using unbounded linked list queue - Check transaction cache before writing to deduplicate across peers - Add Close() method to Database interface for graceful shutdown - Add --write-workers flag to configure worker count - Keep semaphore pattern for block writes (lower volume) Co-Authored-By: Claude Opus 4.5 --- cmd/p2p/sensor/sensor.go | 5 ++ p2p/database/database.go | 4 + p2p/database/datastore.go | 157 ++++++++++++++++++++++++++++++++++++-- p2p/database/json.go | 3 + p2p/database/nodb.go | 3 + p2p/protocol.go | 38 ++++++--- 6 files changed, 193 insertions(+), 17 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 02accd81f..13388a093 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -45,6 +45,7 @@ type ( SensorID string MaxPeers int MaxDatabaseConcurrency int + WriteWorkers int ShouldWriteBlocks bool ShouldWriteBlockEvents bool ShouldWriteFirstBlockEvent bool @@ -271,6 +272,7 @@ var SensorCmd = &cobra.Command{ return err } defer stopServer(&server) + defer db.Close() defer conns.Close() events := make(chan *ethp2p.PeerEvent) @@ -443,6 +445,7 @@ func newDatabase(ctx context.Context) (database.Database, error) { SensorID: inputSensorParams.SensorID, ChainID: inputSensorParams.NetworkID, MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency, + WriteWorkers: inputSensorParams.WriteWorkers, ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks, ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents, ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent, @@ -483,6 +486,8 @@ func init() { f.IntVarP(&inputSensorParams.MaxDatabaseConcurrency, "max-db-concurrency", "D", 10000, `maximum number of concurrent database operations to perform (increasing this will result in less chance of missing data but can significantly increase memory usage)`) + f.IntVar(&inputSensorParams.WriteWorkers, "write-workers", 100, + "number of transaction write worker goroutines") f.BoolVarP(&inputSensorParams.ShouldWriteBlocks, "write-blocks", "B", true, "write blocks to database") f.BoolVar(&inputSensorParams.ShouldWriteBlockEvents, "write-block-events", true, "write block events to database") f.BoolVar(&inputSensorParams.ShouldWriteFirstBlockEvent, "write-first-block-event", false, diff --git a/p2p/database/database.go b/p2p/database/database.go index d31e7cdb0..9505a234c 100644 --- a/p2p/database/database.go +++ b/p2p/database/database.go @@ -48,6 +48,10 @@ type Database interface { // client has not been initialized this will always return true. HasBlock(context.Context, common.Hash) bool + // Close gracefully shuts down the database, waiting for pending writes to + // complete before returning. + Close() + MaxConcurrentWrites() int ShouldWriteBlocks() bool ShouldWriteBlockEvents() bool diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 82ef22ecb..9808e78c1 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -1,9 +1,11 @@ package database import ( + "container/list" "context" "fmt" "math/big" + "sync" "time" "cloud.google.com/go/datastore" @@ -26,6 +28,34 @@ const ( MaxAttempts = 5 ) +// writeJobType identifies the type of database write operation. +type writeJobType int + +const ( + jobWriteTransactions writeJobType = iota + jobWriteTransactionEvents +) + +// writeJob represents a database write operation queued for processing. +type writeJob struct { + jobType writeJobType + ctx context.Context + data any +} + +// txWriteData holds data for writing transactions. +type txWriteData struct { + txs []*types.Transaction + tfs time.Time +} + +// txEventWriteData holds data for writing transaction events. +type txEventWriteData struct { + peer *enode.Node + hashes []common.Hash + tfs time.Time +} + // Datastore wraps the datastore client, stores the sensorID, and other // information needed when writing blocks and transactions. type Datastore struct { @@ -40,8 +70,18 @@ type Datastore struct { shouldWriteTransactionEvents bool shouldWriteFirstTransactionEvent bool shouldWritePeers bool - jobs chan struct{} ttl time.Duration + + // Semaphore for non-transaction writes (blocks, peers, etc.) + jobs chan struct{} + + // Worker pool with unbounded queue for transaction writes only + queue *list.List // Unbounded linked list of writeJob + queueMu sync.Mutex + queueCond *sync.Cond // Signal workers when job available + numWorkers int + wg sync.WaitGroup + closed bool } // DatastoreEvent can represent a peer sending the sensor a transaction hash or @@ -128,6 +168,7 @@ type DatastoreOptions struct { SensorID string ChainID uint64 MaxConcurrency int + WriteWorkers int ShouldWriteBlocks bool ShouldWriteBlockEvents bool ShouldWriteFirstBlockEvent bool @@ -146,7 +187,7 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database { log.Error().Err(err).Msg("Could not connect to Datastore") } - return &Datastore{ + d := &Datastore{ client: client, sensorID: opts.SensorID, chainID: new(big.Int).SetUint64(opts.ChainID), @@ -158,13 +199,104 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database { shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents, shouldWriteFirstTransactionEvent: opts.ShouldWriteFirstTransactionEvent, shouldWritePeers: opts.ShouldWritePeers, - jobs: make(chan struct{}, opts.MaxConcurrency), ttl: opts.TTL, + jobs: make(chan struct{}, opts.MaxConcurrency), + queue: list.New(), + numWorkers: opts.WriteWorkers, + } + d.queueCond = sync.NewCond(&d.queueMu) + + // Start worker pool for transaction writes + for i := 0; i < d.numWorkers; i++ { + d.wg.Add(1) + go d.writeWorker() + } + + log.Info().Int("workers", opts.WriteWorkers).Msg("Started datastore transaction write workers") + + return d +} + +// enqueue adds a job to the unbounded queue (never blocks, never drops). +func (d *Datastore) enqueue(job writeJob) { + d.queueMu.Lock() + d.queue.PushBack(job) + queueLen := d.queue.Len() + d.queueMu.Unlock() + + d.queueCond.Signal() + + // Log warning if queue is growing large + if queueLen > 10000 && queueLen%10000 == 0 { + log.Warn().Int("queue_len", queueLen).Msg("Write queue growing large") } } +// dequeue blocks until a job is available or queue is closed. +func (d *Datastore) dequeue() (writeJob, bool) { + d.queueMu.Lock() + defer d.queueMu.Unlock() + + for d.queue.Len() == 0 && !d.closed { + d.queueCond.Wait() + } + + if d.closed && d.queue.Len() == 0 { + return writeJob{}, false + } + + elem := d.queue.Front() + d.queue.Remove(elem) + return elem.Value.(writeJob), true +} + +// writeWorker is a worker goroutine that processes jobs from the queue. +func (d *Datastore) writeWorker() { + defer d.wg.Done() + for { + job, ok := d.dequeue() + if !ok { + return // Queue closed + } + d.processJob(job) + } +} + +// processJob handles a single write job based on its type. +func (d *Datastore) processJob(job writeJob) { + switch job.jobType { + case jobWriteTransactions: + data := job.data.(*txWriteData) + d.writeTransactions(job.ctx, data.txs, data.tfs) + case jobWriteTransactionEvents: + data := job.data.(*txEventWriteData) + d.writeEvents(job.ctx, data.peer, TransactionEventsKind, data.hashes, TransactionsKind, data.tfs) + } +} + +// Close gracefully shuts down workers after draining the queue. +// Close gracefully shuts down workers after draining the queue. +func (d *Datastore) Close() { + d.queueMu.Lock() + d.closed = true + d.queueMu.Unlock() + + d.queueCond.Broadcast() // Wake all workers + d.wg.Wait() // Wait for all jobs to complete + + log.Info().Msg("Datastore write workers stopped") +} + +// QueueLen returns current queue depth for monitoring. +func (d *Datastore) QueueLen() int { + d.queueMu.Lock() + defer d.queueMu.Unlock() + return d.queue.Len() +} + // runAsync executes the provided function asynchronously with concurrency control. // It uses the jobs channel as a semaphore to limit concurrent operations. +// Used for non-transaction writes (blocks, peers, etc.). func (d *Datastore) runAsync(fn func()) { d.jobs <- struct{}{} go func() { @@ -298,8 +430,13 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs } if d.ShouldWriteTransactions() { - d.runAsync(func() { - d.writeTransactions(ctx, txs, tfs) + d.enqueue(writeJob{ + jobType: jobWriteTransactions, + ctx: ctx, + data: &txWriteData{ + txs: txs, + tfs: tfs, + }, }) } @@ -309,8 +446,14 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs hashes = append(hashes, tx.Hash()) } - d.runAsync(func() { - d.writeEvents(ctx, peer, TransactionEventsKind, hashes, TransactionsKind, tfs) + d.enqueue(writeJob{ + jobType: jobWriteTransactionEvents, + ctx: ctx, + data: &txEventWriteData{ + peer: peer, + hashes: hashes, + tfs: tfs, + }, }) } } diff --git a/p2p/database/json.go b/p2p/database/json.go index 633ce0c79..cfc377145 100644 --- a/p2p/database/json.go +++ b/p2p/database/json.go @@ -403,3 +403,6 @@ func (j *JSONDatabase) ShouldWritePeers() bool { func (j *JSONDatabase) NodeList(ctx context.Context, limit int) ([]string, error) { return []string{}, nil } + +// Close is a no-op for JSON database as writes are synchronous. +func (j *JSONDatabase) Close() {} diff --git a/p2p/database/nodb.go b/p2p/database/nodb.go index 6a5337968..a2fc9ff84 100644 --- a/p2p/database/nodb.go +++ b/p2p/database/nodb.go @@ -98,3 +98,6 @@ func (n *nodb) ShouldWritePeers() bool { func (n *nodb) NodeList(ctx context.Context, limit int) ([]string, error) { return []string{}, nil } + +// Close is a no-op for nodb as no writes occur. +func (n *nodb) Close() {} diff --git a/p2p/protocol.go b/p2p/protocol.go index de2b7a395..6923285c1 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -879,15 +879,24 @@ func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { c.addKnownTx(tx.Hash()) } - if len(txs) > 0 { - c.db.WriteTransactions(ctx, c.node, txs, tfs) + // Check cache FIRST to filter out already-seen transactions + newTxs := make([]*types.Transaction, 0, len(txs)) + for _, tx := range txs { + if _, exists := c.conns.GetTx(tx.Hash()); !exists { + newTxs = append(newTxs, tx) + } } - // Cache transactions for duplicate detection and serving to peers (single lock) - hashes := c.conns.AddTxs(txs) + // Add to cache BEFORE writing (prevents duplicate writes from other peers) + hashes := c.conns.AddTxs(newTxs) + + // Only write NEW transactions (cache miss = needs DB write) + if len(newTxs) > 0 { + c.db.WriteTransactions(ctx, c.node, newTxs, tfs) + } // Broadcast transactions or hashes to other peers asynchronously - go c.conns.BroadcastTxs(types.Transactions(txs)) + go c.conns.BroadcastTxs(types.Transactions(newTxs)) go c.conns.BroadcastTxHashes(hashes) return nil @@ -1266,15 +1275,24 @@ func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) err c.addKnownTx(tx.Hash()) } - if len(txs) > 0 { - c.db.WriteTransactions(ctx, c.node, txs, tfs) + // Check cache FIRST to filter out already-seen transactions + newTxs := make([]*types.Transaction, 0, len(txs)) + for _, tx := range txs { + if _, exists := c.conns.GetTx(tx.Hash()); !exists { + newTxs = append(newTxs, tx) + } } - // Cache transactions for duplicate detection and serving to peers (single lock) - hashes := c.conns.AddTxs(txs) + // Add to cache BEFORE writing (prevents duplicate writes from other peers) + hashes := c.conns.AddTxs(newTxs) + + // Only write NEW transactions (cache miss = needs DB write) + if len(newTxs) > 0 { + c.db.WriteTransactions(ctx, c.node, newTxs, tfs) + } // Broadcast transactions or hashes to other peers asynchronously - go c.conns.BroadcastTxs(types.Transactions(txs)) + go c.conns.BroadcastTxs(types.Transactions(newTxs)) go c.conns.BroadcastTxHashes(hashes) return nil From 366a53ef9659fb2eabcb14b5d78eea7d0c79f019 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Mon, 4 May 2026 20:45:28 -0400 Subject: [PATCH 3/7] refactor: revert to semaphore pattern for transaction writes Remove the worker pool implementation and revert to the original semaphore pattern for database writes. The cache-first transaction deduplication (added in previous commit) should significantly reduce write volume, making the simpler semaphore pattern sufficient. Changes: - Remove worker pool types, queue, and worker goroutines from datastore.go - Remove WriteWorkers from DatastoreOptions - Remove --write-workers flag from sensor command - Keep using runAsync() semaphore pattern for all database writes Co-Authored-By: Claude Opus 4.5 --- cmd/p2p/sensor/sensor.go | 4 - p2p/database/datastore.go | 161 +++----------------------------------- 2 files changed, 11 insertions(+), 154 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 13388a093..f3b8d7b4f 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -45,7 +45,6 @@ type ( SensorID string MaxPeers int MaxDatabaseConcurrency int - WriteWorkers int ShouldWriteBlocks bool ShouldWriteBlockEvents bool ShouldWriteFirstBlockEvent bool @@ -445,7 +444,6 @@ func newDatabase(ctx context.Context) (database.Database, error) { SensorID: inputSensorParams.SensorID, ChainID: inputSensorParams.NetworkID, MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency, - WriteWorkers: inputSensorParams.WriteWorkers, ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks, ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents, ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent, @@ -486,8 +484,6 @@ func init() { f.IntVarP(&inputSensorParams.MaxDatabaseConcurrency, "max-db-concurrency", "D", 10000, `maximum number of concurrent database operations to perform (increasing this will result in less chance of missing data but can significantly increase memory usage)`) - f.IntVar(&inputSensorParams.WriteWorkers, "write-workers", 100, - "number of transaction write worker goroutines") f.BoolVarP(&inputSensorParams.ShouldWriteBlocks, "write-blocks", "B", true, "write blocks to database") f.BoolVar(&inputSensorParams.ShouldWriteBlockEvents, "write-block-events", true, "write block events to database") f.BoolVar(&inputSensorParams.ShouldWriteFirstBlockEvent, "write-first-block-event", false, diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 9808e78c1..5b31e657d 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -1,11 +1,9 @@ package database import ( - "container/list" "context" "fmt" "math/big" - "sync" "time" "cloud.google.com/go/datastore" @@ -28,34 +26,6 @@ const ( MaxAttempts = 5 ) -// writeJobType identifies the type of database write operation. -type writeJobType int - -const ( - jobWriteTransactions writeJobType = iota - jobWriteTransactionEvents -) - -// writeJob represents a database write operation queued for processing. -type writeJob struct { - jobType writeJobType - ctx context.Context - data any -} - -// txWriteData holds data for writing transactions. -type txWriteData struct { - txs []*types.Transaction - tfs time.Time -} - -// txEventWriteData holds data for writing transaction events. -type txEventWriteData struct { - peer *enode.Node - hashes []common.Hash - tfs time.Time -} - // Datastore wraps the datastore client, stores the sensorID, and other // information needed when writing blocks and transactions. type Datastore struct { @@ -71,17 +41,7 @@ type Datastore struct { shouldWriteFirstTransactionEvent bool shouldWritePeers bool ttl time.Duration - - // Semaphore for non-transaction writes (blocks, peers, etc.) - jobs chan struct{} - - // Worker pool with unbounded queue for transaction writes only - queue *list.List // Unbounded linked list of writeJob - queueMu sync.Mutex - queueCond *sync.Cond // Signal workers when job available - numWorkers int - wg sync.WaitGroup - closed bool + jobs chan struct{} } // DatastoreEvent can represent a peer sending the sensor a transaction hash or @@ -168,7 +128,6 @@ type DatastoreOptions struct { SensorID string ChainID uint64 MaxConcurrency int - WriteWorkers int ShouldWriteBlocks bool ShouldWriteBlockEvents bool ShouldWriteFirstBlockEvent bool @@ -187,7 +146,7 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database { log.Error().Err(err).Msg("Could not connect to Datastore") } - d := &Datastore{ + return &Datastore{ client: client, sensorID: opts.SensorID, chainID: new(big.Int).SetUint64(opts.ChainID), @@ -199,104 +158,13 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database { shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents, shouldWriteFirstTransactionEvent: opts.ShouldWriteFirstTransactionEvent, shouldWritePeers: opts.ShouldWritePeers, - ttl: opts.TTL, jobs: make(chan struct{}, opts.MaxConcurrency), - queue: list.New(), - numWorkers: opts.WriteWorkers, - } - d.queueCond = sync.NewCond(&d.queueMu) - - // Start worker pool for transaction writes - for i := 0; i < d.numWorkers; i++ { - d.wg.Add(1) - go d.writeWorker() - } - - log.Info().Int("workers", opts.WriteWorkers).Msg("Started datastore transaction write workers") - - return d -} - -// enqueue adds a job to the unbounded queue (never blocks, never drops). -func (d *Datastore) enqueue(job writeJob) { - d.queueMu.Lock() - d.queue.PushBack(job) - queueLen := d.queue.Len() - d.queueMu.Unlock() - - d.queueCond.Signal() - - // Log warning if queue is growing large - if queueLen > 10000 && queueLen%10000 == 0 { - log.Warn().Int("queue_len", queueLen).Msg("Write queue growing large") - } -} - -// dequeue blocks until a job is available or queue is closed. -func (d *Datastore) dequeue() (writeJob, bool) { - d.queueMu.Lock() - defer d.queueMu.Unlock() - - for d.queue.Len() == 0 && !d.closed { - d.queueCond.Wait() - } - - if d.closed && d.queue.Len() == 0 { - return writeJob{}, false - } - - elem := d.queue.Front() - d.queue.Remove(elem) - return elem.Value.(writeJob), true -} - -// writeWorker is a worker goroutine that processes jobs from the queue. -func (d *Datastore) writeWorker() { - defer d.wg.Done() - for { - job, ok := d.dequeue() - if !ok { - return // Queue closed - } - d.processJob(job) - } -} - -// processJob handles a single write job based on its type. -func (d *Datastore) processJob(job writeJob) { - switch job.jobType { - case jobWriteTransactions: - data := job.data.(*txWriteData) - d.writeTransactions(job.ctx, data.txs, data.tfs) - case jobWriteTransactionEvents: - data := job.data.(*txEventWriteData) - d.writeEvents(job.ctx, data.peer, TransactionEventsKind, data.hashes, TransactionsKind, data.tfs) + ttl: opts.TTL, } } -// Close gracefully shuts down workers after draining the queue. -// Close gracefully shuts down workers after draining the queue. -func (d *Datastore) Close() { - d.queueMu.Lock() - d.closed = true - d.queueMu.Unlock() - - d.queueCond.Broadcast() // Wake all workers - d.wg.Wait() // Wait for all jobs to complete - - log.Info().Msg("Datastore write workers stopped") -} - -// QueueLen returns current queue depth for monitoring. -func (d *Datastore) QueueLen() int { - d.queueMu.Lock() - defer d.queueMu.Unlock() - return d.queue.Len() -} - // runAsync executes the provided function asynchronously with concurrency control. // It uses the jobs channel as a semaphore to limit concurrent operations. -// Used for non-transaction writes (blocks, peers, etc.). func (d *Datastore) runAsync(fn func()) { d.jobs <- struct{}{} go func() { @@ -305,6 +173,9 @@ func (d *Datastore) runAsync(fn func()) { }() } +// Close is a no-op for the semaphore-based datastore. +func (d *Datastore) Close() {} + // WriteBlock writes the block and the block event to datastore. func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Block, td *big.Int, tfs time.Time) { if d.client == nil { @@ -423,6 +294,7 @@ func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, peer *enode.Nod } } +// WriteTransactions will write the transactions and transaction events to datastore. // WriteTransactions will write the transactions and transaction events to datastore. func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs []*types.Transaction, tfs time.Time) { if d.client == nil { @@ -430,13 +302,8 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs } if d.ShouldWriteTransactions() { - d.enqueue(writeJob{ - jobType: jobWriteTransactions, - ctx: ctx, - data: &txWriteData{ - txs: txs, - tfs: tfs, - }, + d.runAsync(func() { + d.writeTransactions(ctx, txs, tfs) }) } @@ -446,14 +313,8 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs hashes = append(hashes, tx.Hash()) } - d.enqueue(writeJob{ - jobType: jobWriteTransactionEvents, - ctx: ctx, - data: &txEventWriteData{ - peer: peer, - hashes: hashes, - tfs: tfs, - }, + d.runAsync(func() { + d.writeEvents(ctx, peer, TransactionEventsKind, hashes, TransactionsKind, tfs) }) } } From 164d2cd84cb9d1072109dec063431a0b925f7117 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Mon, 4 May 2026 20:50:38 -0400 Subject: [PATCH 4/7] chore: go fmt --- cmd/p2p/sensor/sensor.go | 106 +++++++++++++++++++-------------------- 1 file changed, 53 insertions(+), 53 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index f3b8d7b4f..c52de6af0 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -35,59 +35,59 @@ import ( type ( sensorParams struct { - Bootnodes string - NetworkID uint64 - NodesFile string - StaticNodesFile string - TrustedNodesFile string - ProjectID string - DatabaseID string - SensorID string - MaxPeers int - MaxDatabaseConcurrency int - ShouldWriteBlocks bool - ShouldWriteBlockEvents bool - ShouldWriteFirstBlockEvent bool - ShouldWriteTransactions bool - ShouldWriteTransactionEvents bool - ShouldWriteFirstTransactionEvent bool - ShouldWritePeers bool - ShouldBroadcastTx bool - ShouldBroadcastTxHashes bool - ShouldBroadcastBlocks bool - ShouldBroadcastBlockHashes bool - BroadcastWorkers int - TxBatchTimeout time.Duration - TxBroadcastQueueSize int - MaxTxPacketSize int - MaxQueuedTxs int - ShouldRunPprof bool - PprofPort uint - ShouldRunPrometheus bool - PrometheusPort uint - APIPort uint - RPCPort uint - KeyFile string - PrivateKey string - Port int - DiscoveryPort int - RPC string - GenesisHash string - ForkID []byte - DialRatio int - NAT string - TTL time.Duration - DiscoveryDNS string - Database string - NoDiscovery bool - ProxyRPC bool - ProxyRPCTimeout time.Duration - RequestsCache ds.LRUOptions - ParentsCache ds.LRUOptions - BlocksCache ds.LRUOptions - TxsCache ds.LRUOptions - KnownTxsBloom ds.BloomSetOptions - KnownBlocksMax int + Bootnodes string + NetworkID uint64 + NodesFile string + StaticNodesFile string + TrustedNodesFile string + ProjectID string + DatabaseID string + SensorID string + MaxPeers int + MaxDatabaseConcurrency int + ShouldWriteBlocks bool + ShouldWriteBlockEvents bool + ShouldWriteFirstBlockEvent bool + ShouldWriteTransactions bool + ShouldWriteTransactionEvents bool + ShouldWriteFirstTransactionEvent bool + ShouldWritePeers bool + ShouldBroadcastTx bool + ShouldBroadcastTxHashes bool + ShouldBroadcastBlocks bool + ShouldBroadcastBlockHashes bool + BroadcastWorkers int + TxBatchTimeout time.Duration + TxBroadcastQueueSize int + MaxTxPacketSize int + MaxQueuedTxs int + ShouldRunPprof bool + PprofPort uint + ShouldRunPrometheus bool + PrometheusPort uint + APIPort uint + RPCPort uint + KeyFile string + PrivateKey string + Port int + DiscoveryPort int + RPC string + GenesisHash string + ForkID []byte + DialRatio int + NAT string + TTL time.Duration + DiscoveryDNS string + Database string + NoDiscovery bool + ProxyRPC bool + ProxyRPCTimeout time.Duration + RequestsCache ds.LRUOptions + ParentsCache ds.LRUOptions + BlocksCache ds.LRUOptions + TxsCache ds.LRUOptions + KnownTxsBloom ds.BloomSetOptions + KnownBlocksMax int bootnodes []*enode.Node staticNodes []*enode.Node From 9ee64a9e4b34e9de89979271cd3596593fa43588 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Mon, 4 May 2026 20:52:30 -0400 Subject: [PATCH 5/7] fix: reduce indexes --- p2p/database/datastore.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 5b31e657d..677f1c869 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -97,13 +97,13 @@ type DatastoreBlock struct { type DatastoreTransaction struct { Data []byte `datastore:",noindex"` From string - Gas string - GasFeeCap string - GasPrice string - GasTipCap string - Nonce string + Gas string `datastore:",noindex"` + GasFeeCap string `datastore:",noindex"` + GasPrice string `datastore:",noindex"` + GasTipCap string `datastore:",noindex"` + Nonce string `datastore:",noindex"` To string - Value string + Value string `datastore:",noindex"` V, R, S string `datastore:",noindex"` Time time.Time TimeFirstSeen time.Time From 4ec29bf02b04688af344fa7fd92531a4e65c6371 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Mon, 4 May 2026 20:55:27 -0400 Subject: [PATCH 6/7] refactor: remove unused Close() method from Database interface The Close() method was added for the worker pool implementation but is no longer needed after reverting to the semaphore pattern. With the semaphore pattern, in-flight goroutines complete on their own without needing explicit shutdown coordination. Co-Authored-By: Claude Opus 4.5 --- cmd/p2p/sensor/sensor.go | 1 - p2p/database/database.go | 4 ---- p2p/database/datastore.go | 1 - p2p/database/json.go | 2 -- p2p/database/nodb.go | 2 -- 5 files changed, 10 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index c52de6af0..add73dc30 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -271,7 +271,6 @@ var SensorCmd = &cobra.Command{ return err } defer stopServer(&server) - defer db.Close() defer conns.Close() events := make(chan *ethp2p.PeerEvent) diff --git a/p2p/database/database.go b/p2p/database/database.go index 9505a234c..d31e7cdb0 100644 --- a/p2p/database/database.go +++ b/p2p/database/database.go @@ -48,10 +48,6 @@ type Database interface { // client has not been initialized this will always return true. HasBlock(context.Context, common.Hash) bool - // Close gracefully shuts down the database, waiting for pending writes to - // complete before returning. - Close() - MaxConcurrentWrites() int ShouldWriteBlocks() bool ShouldWriteBlockEvents() bool diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 677f1c869..b653f3fc1 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -174,7 +174,6 @@ func (d *Datastore) runAsync(fn func()) { } // Close is a no-op for the semaphore-based datastore. -func (d *Datastore) Close() {} // WriteBlock writes the block and the block event to datastore. func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Block, td *big.Int, tfs time.Time) { diff --git a/p2p/database/json.go b/p2p/database/json.go index cfc377145..4d6663bb6 100644 --- a/p2p/database/json.go +++ b/p2p/database/json.go @@ -404,5 +404,3 @@ func (j *JSONDatabase) NodeList(ctx context.Context, limit int) ([]string, error return []string{}, nil } -// Close is a no-op for JSON database as writes are synchronous. -func (j *JSONDatabase) Close() {} diff --git a/p2p/database/nodb.go b/p2p/database/nodb.go index a2fc9ff84..d1fcfaddb 100644 --- a/p2p/database/nodb.go +++ b/p2p/database/nodb.go @@ -99,5 +99,3 @@ func (n *nodb) NodeList(ctx context.Context, limit int) ([]string, error) { return []string{}, nil } -// Close is a no-op for nodb as no writes occur. -func (n *nodb) Close() {} From 80a818e16d5a9b310129f5b654f0c2b2e9773d84 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Mon, 4 May 2026 20:59:32 -0400 Subject: [PATCH 7/7] refactor: extract processTransactions helper to reduce duplication - Extract common transaction processing logic into processTransactions() - Consolidates duplicate code from handleTransactions and handlePooledTransactions - Remove orphaned comment about Close() method - Remove duplicate WriteTransactions comment Co-Authored-By: Claude Opus 4.5 --- p2p/database/datastore.go | 3 -- p2p/protocol.go | 87 ++++++++++++++------------------------- 2 files changed, 32 insertions(+), 58 deletions(-) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index b653f3fc1..4d4af9579 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -173,8 +173,6 @@ func (d *Datastore) runAsync(fn func()) { }() } -// Close is a no-op for the semaphore-based datastore. - // WriteBlock writes the block and the block event to datastore. func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Block, td *big.Int, tfs time.Time) { if d.client == nil { @@ -293,7 +291,6 @@ func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, peer *enode.Nod } } -// WriteTransactions will write the transactions and transaction events to datastore. // WriteTransactions will write the transactions and transaction events to datastore. func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs []*types.Transaction, tfs time.Time) { if d.client == nil { diff --git a/p2p/protocol.go b/p2p/protocol.go index 6923285c1..6debdba20 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -832,6 +832,36 @@ func (c *conn) decodeTxs(rawTxs []rlp.RawValue) []*types.Transaction { return txs } +// processTransactions handles the common logic for processing incoming transactions +// from both TransactionsMsg and PooledTransactionsMsg. It marks transactions as known, +// filters duplicates via cache, writes new transactions to database, and broadcasts. +func (c *conn) processTransactions(ctx context.Context, txs []*types.Transaction, tfs time.Time) { + // Mark transactions as known from this peer + for _, tx := range txs { + c.addKnownTx(tx.Hash()) + } + + // Check cache FIRST to filter out already-seen transactions + newTxs := make([]*types.Transaction, 0, len(txs)) + for _, tx := range txs { + if _, exists := c.conns.GetTx(tx.Hash()); !exists { + newTxs = append(newTxs, tx) + } + } + + // Add to cache BEFORE writing (prevents duplicate writes from other peers) + hashes := c.conns.AddTxs(newTxs) + + // Only write NEW transactions (cache miss = needs DB write) + if len(newTxs) > 0 { + c.db.WriteTransactions(ctx, c.node, newTxs, tfs) + } + + // Broadcast transactions or hashes to other peers asynchronously + go c.conns.BroadcastTxs(types.Transactions(newTxs)) + go c.conns.BroadcastTxHashes(hashes) +} + // encodeBlockBody converts a block to an eth.BlockBody with RLP-encoded fields. func encodeBlockBody(block *types.Block) (*eth.BlockBody, error) { txList, err := rlp.EncodeToRawList([]*types.Transaction(block.Transactions())) @@ -870,34 +900,8 @@ func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { } txs := c.decodeTxs(rawTxs) - tfs := time.Now() - c.countMsgReceived((ð.TransactionsPacket{}).Name(), float64(len(txs))) - - // Mark transactions as known from this peer - for _, tx := range txs { - c.addKnownTx(tx.Hash()) - } - - // Check cache FIRST to filter out already-seen transactions - newTxs := make([]*types.Transaction, 0, len(txs)) - for _, tx := range txs { - if _, exists := c.conns.GetTx(tx.Hash()); !exists { - newTxs = append(newTxs, tx) - } - } - - // Add to cache BEFORE writing (prevents duplicate writes from other peers) - hashes := c.conns.AddTxs(newTxs) - - // Only write NEW transactions (cache miss = needs DB write) - if len(newTxs) > 0 { - c.db.WriteTransactions(ctx, c.node, newTxs, tfs) - } - - // Broadcast transactions or hashes to other peers asynchronously - go c.conns.BroadcastTxs(types.Transactions(newTxs)) - go c.conns.BroadcastTxHashes(hashes) + c.processTransactions(ctx, txs, time.Now()) return nil } @@ -1265,35 +1269,8 @@ func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) err } txs := c.decodeTxs(raw.Txs) - - tfs := time.Now() - c.countMsgReceived((*eth.PooledTransactionsPacket)(nil).Name(), float64(len(txs))) - - // Mark transactions as known from this peer - for _, tx := range txs { - c.addKnownTx(tx.Hash()) - } - - // Check cache FIRST to filter out already-seen transactions - newTxs := make([]*types.Transaction, 0, len(txs)) - for _, tx := range txs { - if _, exists := c.conns.GetTx(tx.Hash()); !exists { - newTxs = append(newTxs, tx) - } - } - - // Add to cache BEFORE writing (prevents duplicate writes from other peers) - hashes := c.conns.AddTxs(newTxs) - - // Only write NEW transactions (cache miss = needs DB write) - if len(newTxs) > 0 { - c.db.WriteTransactions(ctx, c.node, newTxs, tfs) - } - - // Broadcast transactions or hashes to other peers asynchronously - go c.conns.BroadcastTxs(types.Transactions(newTxs)) - go c.conns.BroadcastTxHashes(hashes) + c.processTransactions(ctx, txs, time.Now()) return nil }