Skip to content

Commit 87a5e26

Browse files
committed
fix: improve p2p sensor transaction writing consistency
- Change handleNewPooledTransactionHashes condition from || to && so transactions are requested when either --write-txs or --write-tx-events is enabled (previously required both) - Preserve earliest TimeFirstSeen for transactions by checking existing records before writing, matching block behavior - Skip writing transactions that already exist with earlier timestamp - Add --write-first-tx-event flag for write-first-event-only behavior - Add SensorFirstSeen field to DatastoreTransaction for sensor attribution
1 parent c98ee8f commit 87a5e26

7 files changed

Lines changed: 130 additions & 87 deletions

File tree

cmd/p2p/sensor/sensor.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ type (
4848
ShouldWriteBlocks bool
4949
ShouldWriteBlockEvents bool
5050
ShouldWriteFirstBlockEvent bool
51-
ShouldWriteTransactions bool
52-
ShouldWriteTransactionEvents bool
53-
ShouldWritePeers bool
51+
ShouldWriteTransactions bool
52+
ShouldWriteTransactionEvents bool
53+
ShouldWriteFirstTransactionEvent bool
54+
ShouldWritePeers bool
5455
ShouldBroadcastTx bool
5556
ShouldBroadcastTxHashes bool
5657
ShouldBroadcastBlocks bool
@@ -437,18 +438,19 @@ func newDatabase(ctx context.Context) (database.Database, error) {
437438
switch inputSensorParams.Database {
438439
case "datastore":
439440
return database.NewDatastore(ctx, database.DatastoreOptions{
440-
ProjectID: inputSensorParams.ProjectID,
441-
DatabaseID: inputSensorParams.DatabaseID,
442-
SensorID: inputSensorParams.SensorID,
443-
ChainID: inputSensorParams.NetworkID,
444-
MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency,
445-
ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks,
446-
ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents,
447-
ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent,
448-
ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions,
449-
ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents,
450-
ShouldWritePeers: inputSensorParams.ShouldWritePeers,
451-
TTL: inputSensorParams.TTL,
441+
ProjectID: inputSensorParams.ProjectID,
442+
DatabaseID: inputSensorParams.DatabaseID,
443+
SensorID: inputSensorParams.SensorID,
444+
ChainID: inputSensorParams.NetworkID,
445+
MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency,
446+
ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks,
447+
ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents,
448+
ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent,
449+
ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions,
450+
ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents,
451+
ShouldWriteFirstTransactionEvent: inputSensorParams.ShouldWriteFirstTransactionEvent,
452+
ShouldWritePeers: inputSensorParams.ShouldWritePeers,
453+
TTL: inputSensorParams.TTL,
452454
}), nil
453455
case "json":
454456
return database.NewJSONDatabase(database.JSONDatabaseOptions{
@@ -489,6 +491,8 @@ will result in less chance of missing data but can significantly increase memory
489491
`write transactions to database (this option can significantly increase CPU and memory usage)`)
490492
f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true,
491493
`write transaction events to database (this option can significantly increase CPU and memory usage)`)
494+
f.BoolVar(&inputSensorParams.ShouldWriteFirstTransactionEvent, "write-first-tx-event", false,
495+
"write one transaction event on first-seen only (requires --write-tx-events=false)")
492496
f.BoolVar(&inputSensorParams.ShouldWritePeers, "write-peers", true, "write peers to database")
493497
f.BoolVar(&inputSensorParams.ShouldBroadcastTx, "broadcast-txs", false, "broadcast full transactions to peers")
494498
f.BoolVar(&inputSensorParams.ShouldBroadcastTxHashes, "broadcast-tx-hashes", false, "broadcast transaction hashes to peers")

doc/polycli_p2p_sensor.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ polycli p2p sensor amoy-nodes.json \
178178
--write-block-events write block events to database (default true)
179179
-B, --write-blocks write blocks to database (default true)
180180
--write-first-block-event write one block event on first-seen only (requires --write-block-events=false)
181+
--write-first-tx-event write one transaction event on first-seen only (requires --write-tx-events=false)
181182
--write-peers write peers to database (default true)
182183
--write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true)
183184
-t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true)

p2p/database/database.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type Database interface {
5454
ShouldWriteFirstBlockEvent() bool
5555
ShouldWriteTransactions() bool
5656
ShouldWriteTransactionEvents() bool
57+
ShouldWriteFirstTransactionEvent() bool
5758
ShouldWritePeers() bool
5859

5960
// NodeList will return a list of enode URLs.

p2p/database/datastore.go

Lines changed: 98 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,19 @@ const (
2929
// Datastore wraps the datastore client, stores the sensorID, and other
3030
// information needed when writing blocks and transactions.
3131
type Datastore struct {
32-
client *datastore.Client
33-
sensorID string
34-
chainID *big.Int
35-
maxConcurrency int
36-
shouldWriteBlocks bool
37-
shouldWriteBlockEvents bool
38-
shouldWriteFirstBlockEvent bool
39-
shouldWriteTransactions bool
40-
shouldWriteTransactionEvents bool
41-
shouldWritePeers bool
42-
jobs chan struct{}
43-
ttl time.Duration
32+
client *datastore.Client
33+
sensorID string
34+
chainID *big.Int
35+
maxConcurrency int
36+
shouldWriteBlocks bool
37+
shouldWriteBlockEvents bool
38+
shouldWriteFirstBlockEvent bool
39+
shouldWriteTransactions bool
40+
shouldWriteTransactionEvents bool
41+
shouldWriteFirstTransactionEvent bool
42+
shouldWritePeers bool
43+
jobs chan struct{}
44+
ttl time.Duration
4445
}
4546

4647
// DatastoreEvent can represent a peer sending the sensor a transaction hash or
@@ -94,20 +95,21 @@ type DatastoreBlock struct {
9495
// not indexed because there is a max sized for indexed byte slices, which Data
9596
// will occasionally exceed.
9697
type DatastoreTransaction struct {
97-
Data []byte `datastore:",noindex"`
98-
From string
99-
Gas string
100-
GasFeeCap string
101-
GasPrice string
102-
GasTipCap string
103-
Nonce string
104-
To string
105-
Value string
106-
V, R, S string `datastore:",noindex"`
107-
Time time.Time
108-
TimeFirstSeen time.Time
109-
TTL time.Time
110-
Type int16
98+
Data []byte `datastore:",noindex"`
99+
From string
100+
Gas string
101+
GasFeeCap string
102+
GasPrice string
103+
GasTipCap string
104+
Nonce string
105+
To string
106+
Value string
107+
V, R, S string `datastore:",noindex"`
108+
Time time.Time
109+
TimeFirstSeen time.Time
110+
TTL time.Time
111+
Type int16
112+
SensorFirstSeen string
111113
}
112114

113115
type DatastorePeer struct {
@@ -121,18 +123,19 @@ type DatastorePeer struct {
121123

122124
// DatastoreOptions is used when creating a NewDatastore.
123125
type DatastoreOptions struct {
124-
ProjectID string
125-
DatabaseID string
126-
SensorID string
127-
ChainID uint64
128-
MaxConcurrency int
129-
ShouldWriteBlocks bool
130-
ShouldWriteBlockEvents bool
131-
ShouldWriteFirstBlockEvent bool
132-
ShouldWriteTransactions bool
133-
ShouldWriteTransactionEvents bool
134-
ShouldWritePeers bool
135-
TTL time.Duration
126+
ProjectID string
127+
DatabaseID string
128+
SensorID string
129+
ChainID uint64
130+
MaxConcurrency int
131+
ShouldWriteBlocks bool
132+
ShouldWriteBlockEvents bool
133+
ShouldWriteFirstBlockEvent bool
134+
ShouldWriteTransactions bool
135+
ShouldWriteTransactionEvents bool
136+
ShouldWriteFirstTransactionEvent bool
137+
ShouldWritePeers bool
138+
TTL time.Duration
136139
}
137140

138141
// NewDatastore connects to datastore and creates the client. This should
@@ -144,18 +147,19 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {
144147
}
145148

146149
return &Datastore{
147-
client: client,
148-
sensorID: opts.SensorID,
149-
chainID: new(big.Int).SetUint64(opts.ChainID),
150-
maxConcurrency: opts.MaxConcurrency,
151-
shouldWriteBlocks: opts.ShouldWriteBlocks,
152-
shouldWriteBlockEvents: opts.ShouldWriteBlockEvents,
153-
shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent,
154-
shouldWriteTransactions: opts.ShouldWriteTransactions,
155-
shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents,
156-
shouldWritePeers: opts.ShouldWritePeers,
157-
jobs: make(chan struct{}, opts.MaxConcurrency),
158-
ttl: opts.TTL,
150+
client: client,
151+
sensorID: opts.SensorID,
152+
chainID: new(big.Int).SetUint64(opts.ChainID),
153+
maxConcurrency: opts.MaxConcurrency,
154+
shouldWriteBlocks: opts.ShouldWriteBlocks,
155+
shouldWriteBlockEvents: opts.ShouldWriteBlockEvents,
156+
shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent,
157+
shouldWriteTransactions: opts.ShouldWriteTransactions,
158+
shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents,
159+
shouldWriteFirstTransactionEvent: opts.ShouldWriteFirstTransactionEvent,
160+
shouldWritePeers: opts.ShouldWritePeers,
161+
jobs: make(chan struct{}, opts.MaxConcurrency),
162+
ttl: opts.TTL,
159163
}
160164
}
161165

@@ -432,22 +436,23 @@ func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, tfs time.Time
432436
}
433437

434438
return &DatastoreTransaction{
435-
Data: tx.Data(),
436-
From: from,
437-
Gas: fmt.Sprint(tx.Gas()),
438-
GasFeeCap: tx.GasFeeCap().String(),
439-
GasPrice: tx.GasPrice().String(),
440-
GasTipCap: tx.GasTipCap().String(),
441-
Nonce: fmt.Sprint(tx.Nonce()),
442-
To: to,
443-
Value: tx.Value().String(),
444-
V: v.String(),
445-
R: r.String(),
446-
S: s.String(),
447-
Time: tx.Time(),
448-
TimeFirstSeen: tfs,
449-
TTL: tfs.Add(d.ttl),
450-
Type: int16(tx.Type()),
439+
Data: tx.Data(),
440+
From: from,
441+
Gas: fmt.Sprint(tx.Gas()),
442+
GasFeeCap: tx.GasFeeCap().String(),
443+
GasPrice: tx.GasPrice().String(),
444+
GasTipCap: tx.GasTipCap().String(),
445+
Nonce: fmt.Sprint(tx.Nonce()),
446+
To: to,
447+
Value: tx.Value().String(),
448+
V: v.String(),
449+
R: r.String(),
450+
S: s.String(),
451+
Time: tx.Time(),
452+
TimeFirstSeen: tfs,
453+
TTL: tfs.Add(d.ttl),
454+
Type: int16(tx.Type()),
455+
SensorFirstSeen: d.sensorID,
451456
}
452457
}
453458

@@ -639,18 +644,36 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
639644
}
640645
}
641646

642-
// writeTransactions will write the transactions to datastore and return the
643-
// transaction hashes.
647+
// writeTransactions will write the transactions to datastore, skipping
648+
// transactions that already exist with an earlier or equal TimeFirstSeen.
644649
func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transaction, tfs time.Time) {
645650
keys := make([]*datastore.Key, 0, len(txs))
646-
transactions := make([]*DatastoreTransaction, 0, len(txs))
647-
648651
for _, tx := range txs {
649652
keys = append(keys, datastore.NameKey(TransactionsKind, tx.Hash().Hex(), nil))
653+
}
654+
655+
// Fetch existing transactions to check TimeFirstSeen
656+
existing := make([]*DatastoreTransaction, len(keys))
657+
_ = d.client.GetMulti(ctx, keys, existing) // Ignore errors - missing keys return nil
658+
659+
transactions := make([]*DatastoreTransaction, 0, len(txs))
660+
keysToWrite := make([]*datastore.Key, 0, len(txs))
661+
662+
for i, tx := range txs {
663+
// Skip if existing record has an earlier or equal TimeFirstSeen
664+
if existing[i] != nil && !existing[i].TimeFirstSeen.IsZero() && !tfs.Before(existing[i].TimeFirstSeen) {
665+
continue
666+
}
667+
668+
keysToWrite = append(keysToWrite, keys[i])
650669
transactions = append(transactions, d.newDatastoreTransaction(tx, tfs))
651670
}
652671

653-
if _, err := d.client.PutMulti(ctx, keys, transactions); err != nil {
672+
if len(keysToWrite) == 0 {
673+
return
674+
}
675+
676+
if _, err := d.client.PutMulti(ctx, keysToWrite, transactions); err != nil {
654677
log.Error().Err(err).Msg("Failed to write transactions")
655678
}
656679
}
@@ -659,6 +682,10 @@ func (d *Datastore) ShouldWriteFirstBlockEvent() bool {
659682
return d.shouldWriteFirstBlockEvent
660683
}
661684

685+
func (d *Datastore) ShouldWriteFirstTransactionEvent() bool {
686+
return d.shouldWriteFirstTransactionEvent
687+
}
688+
662689
func (d *Datastore) NodeList(ctx context.Context, limit int) ([]string, error) {
663690
query := datastore.NewQuery(BlockEventsKind).Order("-Time")
664691
iter := d.client.Run(ctx, query)

p2p/database/json.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,11 @@ func (j *JSONDatabase) ShouldWriteTransactions() bool {
384384
return j.shouldWriteTransactions
385385
}
386386

387+
// ShouldWriteFirstTransactionEvent returns false for JSON database.
388+
func (j *JSONDatabase) ShouldWriteFirstTransactionEvent() bool {
389+
return false
390+
}
391+
387392
// ShouldWriteTransactionEvents returns the configured value.
388393
func (j *JSONDatabase) ShouldWriteTransactionEvents() bool {
389394
return j.shouldWriteTransactionEvents

p2p/database/nodb.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ func (n *nodb) ShouldWriteTransactionEvents() bool {
8484
return false
8585
}
8686

87+
// ShouldWriteFirstTransactionEvent returns false.
88+
func (n *nodb) ShouldWriteFirstTransactionEvent() bool {
89+
return false
90+
}
91+
8792
// ShouldWritePeers returns false.
8893
func (n *nodb) ShouldWritePeers() bool {
8994
return false

p2p/protocol.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1234,7 +1234,7 @@ func (c *conn) handleNewPooledTransactionHashes(version uint, msg ethp2p.Msg) er
12341234

12351235
c.countMsgReceived(name, float64(len(hashes)))
12361236

1237-
if !c.db.ShouldWriteTransactions() || !c.db.ShouldWriteTransactionEvents() {
1237+
if !c.db.ShouldWriteTransactions() && !c.db.ShouldWriteTransactionEvents() {
12381238
return nil
12391239
}
12401240

0 commit comments

Comments
 (0)