diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 957dd09ed..add73dc30 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -35,58 +35,59 @@ import ( type ( sensorParams struct { - Bootnodes string - NetworkID uint64 - NodesFile string - StaticNodesFile string - TrustedNodesFile string - ProjectID string - DatabaseID string - SensorID string - MaxPeers int - MaxDatabaseConcurrency int - ShouldWriteBlocks bool - ShouldWriteBlockEvents bool - ShouldWriteFirstBlockEvent bool - ShouldWriteTransactions bool - ShouldWriteTransactionEvents bool - ShouldWritePeers bool - ShouldBroadcastTx bool - ShouldBroadcastTxHashes bool - ShouldBroadcastBlocks bool - ShouldBroadcastBlockHashes bool - BroadcastWorkers int - TxBatchTimeout time.Duration - TxBroadcastQueueSize int - MaxTxPacketSize int - MaxQueuedTxs int - ShouldRunPprof bool - PprofPort uint - ShouldRunPrometheus bool - PrometheusPort uint - APIPort uint - RPCPort uint - KeyFile string - PrivateKey string - Port int - DiscoveryPort int - RPC string - GenesisHash string - ForkID []byte - DialRatio int - NAT string - TTL time.Duration - DiscoveryDNS string - Database string - NoDiscovery bool - ProxyRPC bool - ProxyRPCTimeout time.Duration - RequestsCache ds.LRUOptions - ParentsCache ds.LRUOptions - BlocksCache ds.LRUOptions - TxsCache ds.LRUOptions - KnownTxsBloom ds.BloomSetOptions - KnownBlocksMax int + Bootnodes string + NetworkID uint64 + NodesFile string + StaticNodesFile string + TrustedNodesFile string + ProjectID string + DatabaseID string + SensorID string + MaxPeers int + MaxDatabaseConcurrency int + ShouldWriteBlocks bool + ShouldWriteBlockEvents bool + ShouldWriteFirstBlockEvent bool + ShouldWriteTransactions bool + ShouldWriteTransactionEvents bool + ShouldWriteFirstTransactionEvent bool + ShouldWritePeers bool + ShouldBroadcastTx bool + ShouldBroadcastTxHashes bool + ShouldBroadcastBlocks bool + ShouldBroadcastBlockHashes bool + BroadcastWorkers int + TxBatchTimeout time.Duration + TxBroadcastQueueSize int + MaxTxPacketSize int + MaxQueuedTxs int + ShouldRunPprof bool + PprofPort uint + ShouldRunPrometheus bool + PrometheusPort uint + APIPort uint + RPCPort uint + KeyFile string + PrivateKey string + Port int + DiscoveryPort int + RPC string + GenesisHash string + ForkID []byte + DialRatio int + NAT string + TTL time.Duration + DiscoveryDNS string + Database string + NoDiscovery bool + ProxyRPC bool + ProxyRPCTimeout time.Duration + RequestsCache ds.LRUOptions + ParentsCache ds.LRUOptions + BlocksCache ds.LRUOptions + TxsCache ds.LRUOptions + KnownTxsBloom ds.BloomSetOptions + KnownBlocksMax int bootnodes []*enode.Node staticNodes []*enode.Node @@ -437,18 +438,19 @@ func newDatabase(ctx context.Context) (database.Database, error) { switch inputSensorParams.Database { case "datastore": return database.NewDatastore(ctx, database.DatastoreOptions{ - ProjectID: inputSensorParams.ProjectID, - DatabaseID: inputSensorParams.DatabaseID, - SensorID: inputSensorParams.SensorID, - ChainID: inputSensorParams.NetworkID, - MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency, - ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks, - ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents, - ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent, - ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions, - ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents, - ShouldWritePeers: inputSensorParams.ShouldWritePeers, - TTL: inputSensorParams.TTL, + ProjectID: inputSensorParams.ProjectID, + DatabaseID: inputSensorParams.DatabaseID, + SensorID: inputSensorParams.SensorID, + ChainID: inputSensorParams.NetworkID, + MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency, + ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks, + ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents, + ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent, + ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions, + ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents, + ShouldWriteFirstTransactionEvent: inputSensorParams.ShouldWriteFirstTransactionEvent, + ShouldWritePeers: inputSensorParams.ShouldWritePeers, + TTL: inputSensorParams.TTL, }), nil case "json": return database.NewJSONDatabase(database.JSONDatabaseOptions{ @@ -489,6 +491,8 @@ will result in less chance of missing data but can significantly increase memory `write transactions to database (this option can significantly increase CPU and memory usage)`) f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true, `write transaction events to database (this option can significantly increase CPU and memory usage)`) + f.BoolVar(&inputSensorParams.ShouldWriteFirstTransactionEvent, "write-first-tx-event", false, + "write one transaction event on first-seen only (requires --write-tx-events=false)") f.BoolVar(&inputSensorParams.ShouldWritePeers, "write-peers", true, "write peers to database") f.BoolVar(&inputSensorParams.ShouldBroadcastTx, "broadcast-txs", false, "broadcast full transactions to peers") f.BoolVar(&inputSensorParams.ShouldBroadcastTxHashes, "broadcast-tx-hashes", false, "broadcast transaction hashes to peers") diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index e08106ac3..e88d6cae0 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -178,6 +178,7 @@ polycli p2p sensor amoy-nodes.json \ --write-block-events write block events to database (default true) -B, --write-blocks write blocks to database (default true) --write-first-block-event write one block event on first-seen only (requires --write-block-events=false) + --write-first-tx-event write one transaction event on first-seen only (requires --write-tx-events=false) --write-peers write peers to database (default true) --write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true) -t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true) diff --git a/p2p/database/database.go b/p2p/database/database.go index dc1cd72d6..d31e7cdb0 100644 --- a/p2p/database/database.go +++ b/p2p/database/database.go @@ -54,6 +54,7 @@ type Database interface { ShouldWriteFirstBlockEvent() bool ShouldWriteTransactions() bool ShouldWriteTransactionEvents() bool + ShouldWriteFirstTransactionEvent() bool ShouldWritePeers() bool // NodeList will return a list of enode URLs. diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 41857d943..4d4af9579 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -29,18 +29,19 @@ const ( // Datastore wraps the datastore client, stores the sensorID, and other // information needed when writing blocks and transactions. type Datastore struct { - client *datastore.Client - sensorID string - chainID *big.Int - maxConcurrency int - shouldWriteBlocks bool - shouldWriteBlockEvents bool - shouldWriteFirstBlockEvent bool - shouldWriteTransactions bool - shouldWriteTransactionEvents bool - shouldWritePeers bool - jobs chan struct{} - ttl time.Duration + client *datastore.Client + sensorID string + chainID *big.Int + maxConcurrency int + shouldWriteBlocks bool + shouldWriteBlockEvents bool + shouldWriteFirstBlockEvent bool + shouldWriteTransactions bool + shouldWriteTransactionEvents bool + shouldWriteFirstTransactionEvent bool + shouldWritePeers bool + ttl time.Duration + jobs chan struct{} } // DatastoreEvent can represent a peer sending the sensor a transaction hash or @@ -94,20 +95,21 @@ type DatastoreBlock struct { // not indexed because there is a max sized for indexed byte slices, which Data // will occasionally exceed. type DatastoreTransaction struct { - Data []byte `datastore:",noindex"` - From string - Gas string - GasFeeCap string - GasPrice string - GasTipCap string - Nonce string - To string - Value string - V, R, S string `datastore:",noindex"` - Time time.Time - TimeFirstSeen time.Time - TTL time.Time - Type int16 + Data []byte `datastore:",noindex"` + From string + Gas string `datastore:",noindex"` + GasFeeCap string `datastore:",noindex"` + GasPrice string `datastore:",noindex"` + GasTipCap string `datastore:",noindex"` + Nonce string `datastore:",noindex"` + To string + Value string `datastore:",noindex"` + V, R, S string `datastore:",noindex"` + Time time.Time + TimeFirstSeen time.Time + TTL time.Time + Type int16 + SensorFirstSeen string } type DatastorePeer struct { @@ -121,18 +123,19 @@ type DatastorePeer struct { // DatastoreOptions is used when creating a NewDatastore. type DatastoreOptions struct { - ProjectID string - DatabaseID string - SensorID string - ChainID uint64 - MaxConcurrency int - ShouldWriteBlocks bool - ShouldWriteBlockEvents bool - ShouldWriteFirstBlockEvent bool - ShouldWriteTransactions bool - ShouldWriteTransactionEvents bool - ShouldWritePeers bool - TTL time.Duration + ProjectID string + DatabaseID string + SensorID string + ChainID uint64 + MaxConcurrency int + ShouldWriteBlocks bool + ShouldWriteBlockEvents bool + ShouldWriteFirstBlockEvent bool + ShouldWriteTransactions bool + ShouldWriteTransactionEvents bool + ShouldWriteFirstTransactionEvent bool + ShouldWritePeers bool + TTL time.Duration } // NewDatastore connects to datastore and creates the client. This should @@ -144,18 +147,19 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database { } return &Datastore{ - client: client, - sensorID: opts.SensorID, - chainID: new(big.Int).SetUint64(opts.ChainID), - maxConcurrency: opts.MaxConcurrency, - shouldWriteBlocks: opts.ShouldWriteBlocks, - shouldWriteBlockEvents: opts.ShouldWriteBlockEvents, - shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent, - shouldWriteTransactions: opts.ShouldWriteTransactions, - shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents, - shouldWritePeers: opts.ShouldWritePeers, - jobs: make(chan struct{}, opts.MaxConcurrency), - ttl: opts.TTL, + client: client, + sensorID: opts.SensorID, + chainID: new(big.Int).SetUint64(opts.ChainID), + maxConcurrency: opts.MaxConcurrency, + shouldWriteBlocks: opts.ShouldWriteBlocks, + shouldWriteBlockEvents: opts.ShouldWriteBlockEvents, + shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent, + shouldWriteTransactions: opts.ShouldWriteTransactions, + shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents, + shouldWriteFirstTransactionEvent: opts.ShouldWriteFirstTransactionEvent, + shouldWritePeers: opts.ShouldWritePeers, + jobs: make(chan struct{}, opts.MaxConcurrency), + ttl: opts.TTL, } } @@ -432,22 +436,23 @@ func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, tfs time.Time } return &DatastoreTransaction{ - Data: tx.Data(), - From: from, - Gas: fmt.Sprint(tx.Gas()), - GasFeeCap: tx.GasFeeCap().String(), - GasPrice: tx.GasPrice().String(), - GasTipCap: tx.GasTipCap().String(), - Nonce: fmt.Sprint(tx.Nonce()), - To: to, - Value: tx.Value().String(), - V: v.String(), - R: r.String(), - S: s.String(), - Time: tx.Time(), - TimeFirstSeen: tfs, - TTL: tfs.Add(d.ttl), - Type: int16(tx.Type()), + Data: tx.Data(), + From: from, + Gas: fmt.Sprint(tx.Gas()), + GasFeeCap: tx.GasFeeCap().String(), + GasPrice: tx.GasPrice().String(), + GasTipCap: tx.GasTipCap().String(), + Nonce: fmt.Sprint(tx.Nonce()), + To: to, + Value: tx.Value().String(), + V: v.String(), + R: r.String(), + S: s.String(), + Time: tx.Time(), + TimeFirstSeen: tfs, + TTL: tfs.Add(d.ttl), + Type: int16(tx.Type()), + SensorFirstSeen: d.sensorID, } } @@ -639,18 +644,36 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has } } -// writeTransactions will write the transactions to datastore and return the -// transaction hashes. +// writeTransactions will write the transactions to datastore, skipping +// transactions that already exist with an earlier or equal TimeFirstSeen. func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transaction, tfs time.Time) { keys := make([]*datastore.Key, 0, len(txs)) - transactions := make([]*DatastoreTransaction, 0, len(txs)) - for _, tx := range txs { keys = append(keys, datastore.NameKey(TransactionsKind, tx.Hash().Hex(), nil)) + } + + // Fetch existing transactions to check TimeFirstSeen + existing := make([]*DatastoreTransaction, len(keys)) + _ = d.client.GetMulti(ctx, keys, existing) // Ignore errors - missing keys return nil + + transactions := make([]*DatastoreTransaction, 0, len(txs)) + keysToWrite := make([]*datastore.Key, 0, len(txs)) + + for i, tx := range txs { + // Skip if existing record has an earlier or equal TimeFirstSeen + if existing[i] != nil && !existing[i].TimeFirstSeen.IsZero() && !tfs.Before(existing[i].TimeFirstSeen) { + continue + } + + keysToWrite = append(keysToWrite, keys[i]) transactions = append(transactions, d.newDatastoreTransaction(tx, tfs)) } - if _, err := d.client.PutMulti(ctx, keys, transactions); err != nil { + if len(keysToWrite) == 0 { + return + } + + if _, err := d.client.PutMulti(ctx, keysToWrite, transactions); err != nil { log.Error().Err(err).Msg("Failed to write transactions") } } @@ -659,6 +682,10 @@ func (d *Datastore) ShouldWriteFirstBlockEvent() bool { return d.shouldWriteFirstBlockEvent } +func (d *Datastore) ShouldWriteFirstTransactionEvent() bool { + return d.shouldWriteFirstTransactionEvent +} + func (d *Datastore) NodeList(ctx context.Context, limit int) ([]string, error) { query := datastore.NewQuery(BlockEventsKind).Order("-Time") iter := d.client.Run(ctx, query) diff --git a/p2p/database/json.go b/p2p/database/json.go index d1a9ab0fc..4d6663bb6 100644 --- a/p2p/database/json.go +++ b/p2p/database/json.go @@ -384,6 +384,11 @@ func (j *JSONDatabase) ShouldWriteTransactions() bool { return j.shouldWriteTransactions } +// ShouldWriteFirstTransactionEvent returns false for JSON database. +func (j *JSONDatabase) ShouldWriteFirstTransactionEvent() bool { + return false +} + // ShouldWriteTransactionEvents returns the configured value. func (j *JSONDatabase) ShouldWriteTransactionEvents() bool { return j.shouldWriteTransactionEvents @@ -398,3 +403,4 @@ func (j *JSONDatabase) ShouldWritePeers() bool { func (j *JSONDatabase) NodeList(ctx context.Context, limit int) ([]string, error) { return []string{}, nil } + diff --git a/p2p/database/nodb.go b/p2p/database/nodb.go index 1bc003980..d1fcfaddb 100644 --- a/p2p/database/nodb.go +++ b/p2p/database/nodb.go @@ -84,6 +84,11 @@ func (n *nodb) ShouldWriteTransactionEvents() bool { return false } +// ShouldWriteFirstTransactionEvent returns false. +func (n *nodb) ShouldWriteFirstTransactionEvent() bool { + return false +} + // ShouldWritePeers returns false. func (n *nodb) ShouldWritePeers() bool { return false @@ -93,3 +98,4 @@ func (n *nodb) ShouldWritePeers() bool { func (n *nodb) NodeList(ctx context.Context, limit int) ([]string, error) { return []string{}, nil } + diff --git a/p2p/protocol.go b/p2p/protocol.go index 21fc779b3..6debdba20 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -832,6 +832,36 @@ func (c *conn) decodeTxs(rawTxs []rlp.RawValue) []*types.Transaction { return txs } +// processTransactions handles the common logic for processing incoming transactions +// from both TransactionsMsg and PooledTransactionsMsg. It marks transactions as known, +// filters duplicates via cache, writes new transactions to database, and broadcasts. +func (c *conn) processTransactions(ctx context.Context, txs []*types.Transaction, tfs time.Time) { + // Mark transactions as known from this peer + for _, tx := range txs { + c.addKnownTx(tx.Hash()) + } + + // Check cache FIRST to filter out already-seen transactions + newTxs := make([]*types.Transaction, 0, len(txs)) + for _, tx := range txs { + if _, exists := c.conns.GetTx(tx.Hash()); !exists { + newTxs = append(newTxs, tx) + } + } + + // Add to cache BEFORE writing (prevents duplicate writes from other peers) + hashes := c.conns.AddTxs(newTxs) + + // Only write NEW transactions (cache miss = needs DB write) + if len(newTxs) > 0 { + c.db.WriteTransactions(ctx, c.node, newTxs, tfs) + } + + // Broadcast transactions or hashes to other peers asynchronously + go c.conns.BroadcastTxs(types.Transactions(newTxs)) + go c.conns.BroadcastTxHashes(hashes) +} + // encodeBlockBody converts a block to an eth.BlockBody with RLP-encoded fields. func encodeBlockBody(block *types.Block) (*eth.BlockBody, error) { txList, err := rlp.EncodeToRawList([]*types.Transaction(block.Transactions())) @@ -870,25 +900,8 @@ func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { } txs := c.decodeTxs(rawTxs) - tfs := time.Now() - c.countMsgReceived((ð.TransactionsPacket{}).Name(), float64(len(txs))) - - // Mark transactions as known from this peer - for _, tx := range txs { - c.addKnownTx(tx.Hash()) - } - - if len(txs) > 0 { - c.db.WriteTransactions(ctx, c.node, txs, tfs) - } - - // Cache transactions for duplicate detection and serving to peers (single lock) - hashes := c.conns.AddTxs(txs) - - // Broadcast transactions or hashes to other peers asynchronously - go c.conns.BroadcastTxs(types.Transactions(txs)) - go c.conns.BroadcastTxHashes(hashes) + c.processTransactions(ctx, txs, time.Now()) return nil } @@ -1234,7 +1247,7 @@ func (c *conn) handleNewPooledTransactionHashes(version uint, msg ethp2p.Msg) er c.countMsgReceived(name, float64(len(hashes))) - if !c.db.ShouldWriteTransactions() || !c.db.ShouldWriteTransactionEvents() { + if !c.db.ShouldWriteTransactions() && !c.db.ShouldWriteTransactionEvents() { return nil } @@ -1256,26 +1269,8 @@ func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) err } txs := c.decodeTxs(raw.Txs) - - tfs := time.Now() - c.countMsgReceived((*eth.PooledTransactionsPacket)(nil).Name(), float64(len(txs))) - - // Mark transactions as known from this peer - for _, tx := range txs { - c.addKnownTx(tx.Hash()) - } - - if len(txs) > 0 { - c.db.WriteTransactions(ctx, c.node, txs, tfs) - } - - // Cache transactions for duplicate detection and serving to peers (single lock) - hashes := c.conns.AddTxs(txs) - - // Broadcast transactions or hashes to other peers asynchronously - go c.conns.BroadcastTxs(types.Transactions(txs)) - go c.conns.BroadcastTxHashes(hashes) + c.processTransactions(ctx, txs, time.Now()) return nil }