Skip to content

Commit 8ecdb0f

Browse files
minhd-vuclaude
andauthored
fix(p2p/sensor): improve transaction writing consistency (#909)
* fix: improve p2p sensor transaction writing consistency - Change handleNewPooledTransactionHashes condition from || to && so transactions are requested when either --write-txs or --write-tx-events is enabled (previously required both) - Preserve earliest TimeFirstSeen for transactions by checking existing records before writing, matching block behavior - Skip writing transactions that already exist with earlier timestamp - Add --write-first-tx-event flag for write-first-event-only behavior - Add SensorFirstSeen field to DatastoreTransaction for sensor attribution * feat: add worker pool for transaction writes and cache-first deduplication - Add fixed worker pool (default 100 workers) for transaction and transaction event writes using unbounded linked list queue - Check transaction cache before writing to deduplicate across peers - Add Close() method to Database interface for graceful shutdown - Add --write-workers flag to configure worker count - Keep semaphore pattern for block writes (lower volume) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * refactor: revert to semaphore pattern for transaction writes Remove the worker pool implementation and revert to the original semaphore pattern for database writes. The cache-first transaction deduplication (added in previous commit) should significantly reduce write volume, making the simpler semaphore pattern sufficient. Changes: - Remove worker pool types, queue, and worker goroutines from datastore.go - Remove WriteWorkers from DatastoreOptions - Remove --write-workers flag from sensor command - Keep using runAsync() semaphore pattern for all database writes Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * chore: go fmt * fix: reduce indexes * refactor: remove unused Close() method from Database interface The Close() method was added for the worker pool implementation but is no longer needed after reverting to the semaphore pattern. With the semaphore pattern, in-flight goroutines complete on their own without needing explicit shutdown coordination. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * refactor: extract processTransactions helper to reduce duplication - Extract common transaction processing logic into processTransactions() - Consolidates duplicate code from handleTransactions and handlePooledTransactions - Remove orphaned comment about Close() method - Remove duplicate WriteTransactions comment Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent add3d11 commit 8ecdb0f

7 files changed

Lines changed: 213 additions & 173 deletions

File tree

cmd/p2p/sensor/sensor.go

Lines changed: 68 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -35,58 +35,59 @@ import (
3535

3636
type (
3737
sensorParams struct {
38-
Bootnodes string
39-
NetworkID uint64
40-
NodesFile string
41-
StaticNodesFile string
42-
TrustedNodesFile string
43-
ProjectID string
44-
DatabaseID string
45-
SensorID string
46-
MaxPeers int
47-
MaxDatabaseConcurrency int
48-
ShouldWriteBlocks bool
49-
ShouldWriteBlockEvents bool
50-
ShouldWriteFirstBlockEvent bool
51-
ShouldWriteTransactions bool
52-
ShouldWriteTransactionEvents bool
53-
ShouldWritePeers bool
54-
ShouldBroadcastTx bool
55-
ShouldBroadcastTxHashes bool
56-
ShouldBroadcastBlocks bool
57-
ShouldBroadcastBlockHashes bool
58-
BroadcastWorkers int
59-
TxBatchTimeout time.Duration
60-
TxBroadcastQueueSize int
61-
MaxTxPacketSize int
62-
MaxQueuedTxs int
63-
ShouldRunPprof bool
64-
PprofPort uint
65-
ShouldRunPrometheus bool
66-
PrometheusPort uint
67-
APIPort uint
68-
RPCPort uint
69-
KeyFile string
70-
PrivateKey string
71-
Port int
72-
DiscoveryPort int
73-
RPC string
74-
GenesisHash string
75-
ForkID []byte
76-
DialRatio int
77-
NAT string
78-
TTL time.Duration
79-
DiscoveryDNS string
80-
Database string
81-
NoDiscovery bool
82-
ProxyRPC bool
83-
ProxyRPCTimeout time.Duration
84-
RequestsCache ds.LRUOptions
85-
ParentsCache ds.LRUOptions
86-
BlocksCache ds.LRUOptions
87-
TxsCache ds.LRUOptions
88-
KnownTxsBloom ds.BloomSetOptions
89-
KnownBlocksMax int
38+
Bootnodes string
39+
NetworkID uint64
40+
NodesFile string
41+
StaticNodesFile string
42+
TrustedNodesFile string
43+
ProjectID string
44+
DatabaseID string
45+
SensorID string
46+
MaxPeers int
47+
MaxDatabaseConcurrency int
48+
ShouldWriteBlocks bool
49+
ShouldWriteBlockEvents bool
50+
ShouldWriteFirstBlockEvent bool
51+
ShouldWriteTransactions bool
52+
ShouldWriteTransactionEvents bool
53+
ShouldWriteFirstTransactionEvent bool
54+
ShouldWritePeers bool
55+
ShouldBroadcastTx bool
56+
ShouldBroadcastTxHashes bool
57+
ShouldBroadcastBlocks bool
58+
ShouldBroadcastBlockHashes bool
59+
BroadcastWorkers int
60+
TxBatchTimeout time.Duration
61+
TxBroadcastQueueSize int
62+
MaxTxPacketSize int
63+
MaxQueuedTxs int
64+
ShouldRunPprof bool
65+
PprofPort uint
66+
ShouldRunPrometheus bool
67+
PrometheusPort uint
68+
APIPort uint
69+
RPCPort uint
70+
KeyFile string
71+
PrivateKey string
72+
Port int
73+
DiscoveryPort int
74+
RPC string
75+
GenesisHash string
76+
ForkID []byte
77+
DialRatio int
78+
NAT string
79+
TTL time.Duration
80+
DiscoveryDNS string
81+
Database string
82+
NoDiscovery bool
83+
ProxyRPC bool
84+
ProxyRPCTimeout time.Duration
85+
RequestsCache ds.LRUOptions
86+
ParentsCache ds.LRUOptions
87+
BlocksCache ds.LRUOptions
88+
TxsCache ds.LRUOptions
89+
KnownTxsBloom ds.BloomSetOptions
90+
KnownBlocksMax int
9091

9192
bootnodes []*enode.Node
9293
staticNodes []*enode.Node
@@ -437,18 +438,19 @@ func newDatabase(ctx context.Context) (database.Database, error) {
437438
switch inputSensorParams.Database {
438439
case "datastore":
439440
return database.NewDatastore(ctx, database.DatastoreOptions{
440-
ProjectID: inputSensorParams.ProjectID,
441-
DatabaseID: inputSensorParams.DatabaseID,
442-
SensorID: inputSensorParams.SensorID,
443-
ChainID: inputSensorParams.NetworkID,
444-
MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency,
445-
ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks,
446-
ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents,
447-
ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent,
448-
ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions,
449-
ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents,
450-
ShouldWritePeers: inputSensorParams.ShouldWritePeers,
451-
TTL: inputSensorParams.TTL,
441+
ProjectID: inputSensorParams.ProjectID,
442+
DatabaseID: inputSensorParams.DatabaseID,
443+
SensorID: inputSensorParams.SensorID,
444+
ChainID: inputSensorParams.NetworkID,
445+
MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency,
446+
ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks,
447+
ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents,
448+
ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent,
449+
ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions,
450+
ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents,
451+
ShouldWriteFirstTransactionEvent: inputSensorParams.ShouldWriteFirstTransactionEvent,
452+
ShouldWritePeers: inputSensorParams.ShouldWritePeers,
453+
TTL: inputSensorParams.TTL,
452454
}), nil
453455
case "json":
454456
return database.NewJSONDatabase(database.JSONDatabaseOptions{
@@ -489,6 +491,8 @@ will result in less chance of missing data but can significantly increase memory
489491
`write transactions to database (this option can significantly increase CPU and memory usage)`)
490492
f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true,
491493
`write transaction events to database (this option can significantly increase CPU and memory usage)`)
494+
f.BoolVar(&inputSensorParams.ShouldWriteFirstTransactionEvent, "write-first-tx-event", false,
495+
"write one transaction event on first-seen only (requires --write-tx-events=false)")
492496
f.BoolVar(&inputSensorParams.ShouldWritePeers, "write-peers", true, "write peers to database")
493497
f.BoolVar(&inputSensorParams.ShouldBroadcastTx, "broadcast-txs", false, "broadcast full transactions to peers")
494498
f.BoolVar(&inputSensorParams.ShouldBroadcastTxHashes, "broadcast-tx-hashes", false, "broadcast transaction hashes to peers")

doc/polycli_p2p_sensor.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ polycli p2p sensor amoy-nodes.json \
178178
--write-block-events write block events to database (default true)
179179
-B, --write-blocks write blocks to database (default true)
180180
--write-first-block-event write one block event on first-seen only (requires --write-block-events=false)
181+
--write-first-tx-event write one transaction event on first-seen only (requires --write-tx-events=false)
181182
--write-peers write peers to database (default true)
182183
--write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true)
183184
-t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true)

p2p/database/database.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type Database interface {
5454
ShouldWriteFirstBlockEvent() bool
5555
ShouldWriteTransactions() bool
5656
ShouldWriteTransactionEvents() bool
57+
ShouldWriteFirstTransactionEvent() bool
5758
ShouldWritePeers() bool
5859

5960
// NodeList will return a list of enode URLs.

p2p/database/datastore.go

Lines changed: 98 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,19 @@ const (
2929
// Datastore wraps the datastore client, stores the sensorID, and other
3030
// information needed when writing blocks and transactions.
3131
type Datastore struct {
32-
client *datastore.Client
33-
sensorID string
34-
chainID *big.Int
35-
maxConcurrency int
36-
shouldWriteBlocks bool
37-
shouldWriteBlockEvents bool
38-
shouldWriteFirstBlockEvent bool
39-
shouldWriteTransactions bool
40-
shouldWriteTransactionEvents bool
41-
shouldWritePeers bool
42-
jobs chan struct{}
43-
ttl time.Duration
32+
client *datastore.Client
33+
sensorID string
34+
chainID *big.Int
35+
maxConcurrency int
36+
shouldWriteBlocks bool
37+
shouldWriteBlockEvents bool
38+
shouldWriteFirstBlockEvent bool
39+
shouldWriteTransactions bool
40+
shouldWriteTransactionEvents bool
41+
shouldWriteFirstTransactionEvent bool
42+
shouldWritePeers bool
43+
ttl time.Duration
44+
jobs chan struct{}
4445
}
4546

4647
// DatastoreEvent can represent a peer sending the sensor a transaction hash or
@@ -94,20 +95,21 @@ type DatastoreBlock struct {
9495
// not indexed because there is a max sized for indexed byte slices, which Data
9596
// will occasionally exceed.
9697
type DatastoreTransaction struct {
97-
Data []byte `datastore:",noindex"`
98-
From string
99-
Gas string
100-
GasFeeCap string
101-
GasPrice string
102-
GasTipCap string
103-
Nonce string
104-
To string
105-
Value string
106-
V, R, S string `datastore:",noindex"`
107-
Time time.Time
108-
TimeFirstSeen time.Time
109-
TTL time.Time
110-
Type int16
98+
Data []byte `datastore:",noindex"`
99+
From string
100+
Gas string `datastore:",noindex"`
101+
GasFeeCap string `datastore:",noindex"`
102+
GasPrice string `datastore:",noindex"`
103+
GasTipCap string `datastore:",noindex"`
104+
Nonce string `datastore:",noindex"`
105+
To string
106+
Value string `datastore:",noindex"`
107+
V, R, S string `datastore:",noindex"`
108+
Time time.Time
109+
TimeFirstSeen time.Time
110+
TTL time.Time
111+
Type int16
112+
SensorFirstSeen string
111113
}
112114

113115
type DatastorePeer struct {
@@ -121,18 +123,19 @@ type DatastorePeer struct {
121123

122124
// DatastoreOptions is used when creating a NewDatastore.
123125
type DatastoreOptions struct {
124-
ProjectID string
125-
DatabaseID string
126-
SensorID string
127-
ChainID uint64
128-
MaxConcurrency int
129-
ShouldWriteBlocks bool
130-
ShouldWriteBlockEvents bool
131-
ShouldWriteFirstBlockEvent bool
132-
ShouldWriteTransactions bool
133-
ShouldWriteTransactionEvents bool
134-
ShouldWritePeers bool
135-
TTL time.Duration
126+
ProjectID string
127+
DatabaseID string
128+
SensorID string
129+
ChainID uint64
130+
MaxConcurrency int
131+
ShouldWriteBlocks bool
132+
ShouldWriteBlockEvents bool
133+
ShouldWriteFirstBlockEvent bool
134+
ShouldWriteTransactions bool
135+
ShouldWriteTransactionEvents bool
136+
ShouldWriteFirstTransactionEvent bool
137+
ShouldWritePeers bool
138+
TTL time.Duration
136139
}
137140

138141
// NewDatastore connects to datastore and creates the client. This should
@@ -144,18 +147,19 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {
144147
}
145148

146149
return &Datastore{
147-
client: client,
148-
sensorID: opts.SensorID,
149-
chainID: new(big.Int).SetUint64(opts.ChainID),
150-
maxConcurrency: opts.MaxConcurrency,
151-
shouldWriteBlocks: opts.ShouldWriteBlocks,
152-
shouldWriteBlockEvents: opts.ShouldWriteBlockEvents,
153-
shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent,
154-
shouldWriteTransactions: opts.ShouldWriteTransactions,
155-
shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents,
156-
shouldWritePeers: opts.ShouldWritePeers,
157-
jobs: make(chan struct{}, opts.MaxConcurrency),
158-
ttl: opts.TTL,
150+
client: client,
151+
sensorID: opts.SensorID,
152+
chainID: new(big.Int).SetUint64(opts.ChainID),
153+
maxConcurrency: opts.MaxConcurrency,
154+
shouldWriteBlocks: opts.ShouldWriteBlocks,
155+
shouldWriteBlockEvents: opts.ShouldWriteBlockEvents,
156+
shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent,
157+
shouldWriteTransactions: opts.ShouldWriteTransactions,
158+
shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents,
159+
shouldWriteFirstTransactionEvent: opts.ShouldWriteFirstTransactionEvent,
160+
shouldWritePeers: opts.ShouldWritePeers,
161+
jobs: make(chan struct{}, opts.MaxConcurrency),
162+
ttl: opts.TTL,
159163
}
160164
}
161165

@@ -432,22 +436,23 @@ func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, tfs time.Time
432436
}
433437

434438
return &DatastoreTransaction{
435-
Data: tx.Data(),
436-
From: from,
437-
Gas: fmt.Sprint(tx.Gas()),
438-
GasFeeCap: tx.GasFeeCap().String(),
439-
GasPrice: tx.GasPrice().String(),
440-
GasTipCap: tx.GasTipCap().String(),
441-
Nonce: fmt.Sprint(tx.Nonce()),
442-
To: to,
443-
Value: tx.Value().String(),
444-
V: v.String(),
445-
R: r.String(),
446-
S: s.String(),
447-
Time: tx.Time(),
448-
TimeFirstSeen: tfs,
449-
TTL: tfs.Add(d.ttl),
450-
Type: int16(tx.Type()),
439+
Data: tx.Data(),
440+
From: from,
441+
Gas: fmt.Sprint(tx.Gas()),
442+
GasFeeCap: tx.GasFeeCap().String(),
443+
GasPrice: tx.GasPrice().String(),
444+
GasTipCap: tx.GasTipCap().String(),
445+
Nonce: fmt.Sprint(tx.Nonce()),
446+
To: to,
447+
Value: tx.Value().String(),
448+
V: v.String(),
449+
R: r.String(),
450+
S: s.String(),
451+
Time: tx.Time(),
452+
TimeFirstSeen: tfs,
453+
TTL: tfs.Add(d.ttl),
454+
Type: int16(tx.Type()),
455+
SensorFirstSeen: d.sensorID,
451456
}
452457
}
453458

@@ -639,18 +644,36 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
639644
}
640645
}
641646

642-
// writeTransactions will write the transactions to datastore and return the
643-
// transaction hashes.
647+
// writeTransactions will write the transactions to datastore, skipping
648+
// transactions that already exist with an earlier or equal TimeFirstSeen.
644649
func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transaction, tfs time.Time) {
645650
keys := make([]*datastore.Key, 0, len(txs))
646-
transactions := make([]*DatastoreTransaction, 0, len(txs))
647-
648651
for _, tx := range txs {
649652
keys = append(keys, datastore.NameKey(TransactionsKind, tx.Hash().Hex(), nil))
653+
}
654+
655+
// Fetch existing transactions to check TimeFirstSeen
656+
existing := make([]*DatastoreTransaction, len(keys))
657+
_ = d.client.GetMulti(ctx, keys, existing) // Ignore errors - missing keys return nil
658+
659+
transactions := make([]*DatastoreTransaction, 0, len(txs))
660+
keysToWrite := make([]*datastore.Key, 0, len(txs))
661+
662+
for i, tx := range txs {
663+
// Skip if existing record has an earlier or equal TimeFirstSeen
664+
if existing[i] != nil && !existing[i].TimeFirstSeen.IsZero() && !tfs.Before(existing[i].TimeFirstSeen) {
665+
continue
666+
}
667+
668+
keysToWrite = append(keysToWrite, keys[i])
650669
transactions = append(transactions, d.newDatastoreTransaction(tx, tfs))
651670
}
652671

653-
if _, err := d.client.PutMulti(ctx, keys, transactions); err != nil {
672+
if len(keysToWrite) == 0 {
673+
return
674+
}
675+
676+
if _, err := d.client.PutMulti(ctx, keysToWrite, transactions); err != nil {
654677
log.Error().Err(err).Msg("Failed to write transactions")
655678
}
656679
}
@@ -659,6 +682,10 @@ func (d *Datastore) ShouldWriteFirstBlockEvent() bool {
659682
return d.shouldWriteFirstBlockEvent
660683
}
661684

685+
func (d *Datastore) ShouldWriteFirstTransactionEvent() bool {
686+
return d.shouldWriteFirstTransactionEvent
687+
}
688+
662689
func (d *Datastore) NodeList(ctx context.Context, limit int) ([]string, error) {
663690
query := datastore.NewQuery(BlockEventsKind).Order("-Time")
664691
iter := d.client.Run(ctx, query)

0 commit comments

Comments
 (0)