Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 68 additions & 64 deletions cmd/p2p/sensor/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,58 +35,59 @@ import (

type (
sensorParams struct {
Bootnodes string
NetworkID uint64
NodesFile string
StaticNodesFile string
TrustedNodesFile string
ProjectID string
DatabaseID string
SensorID string
MaxPeers int
MaxDatabaseConcurrency int
ShouldWriteBlocks bool
ShouldWriteBlockEvents bool
ShouldWriteFirstBlockEvent bool
ShouldWriteTransactions bool
ShouldWriteTransactionEvents bool
ShouldWritePeers bool
ShouldBroadcastTx bool
ShouldBroadcastTxHashes bool
ShouldBroadcastBlocks bool
ShouldBroadcastBlockHashes bool
BroadcastWorkers int
TxBatchTimeout time.Duration
TxBroadcastQueueSize int
MaxTxPacketSize int
MaxQueuedTxs int
ShouldRunPprof bool
PprofPort uint
ShouldRunPrometheus bool
PrometheusPort uint
APIPort uint
RPCPort uint
KeyFile string
PrivateKey string
Port int
DiscoveryPort int
RPC string
GenesisHash string
ForkID []byte
DialRatio int
NAT string
TTL time.Duration
DiscoveryDNS string
Database string
NoDiscovery bool
ProxyRPC bool
ProxyRPCTimeout time.Duration
RequestsCache ds.LRUOptions
ParentsCache ds.LRUOptions
BlocksCache ds.LRUOptions
TxsCache ds.LRUOptions
KnownTxsBloom ds.BloomSetOptions
KnownBlocksMax int
Bootnodes string
NetworkID uint64
NodesFile string
StaticNodesFile string
TrustedNodesFile string
ProjectID string
DatabaseID string
SensorID string
MaxPeers int
MaxDatabaseConcurrency int
ShouldWriteBlocks bool
ShouldWriteBlockEvents bool
ShouldWriteFirstBlockEvent bool
ShouldWriteTransactions bool
ShouldWriteTransactionEvents bool
ShouldWriteFirstTransactionEvent bool
ShouldWritePeers bool
ShouldBroadcastTx bool
ShouldBroadcastTxHashes bool
ShouldBroadcastBlocks bool
ShouldBroadcastBlockHashes bool
BroadcastWorkers int
TxBatchTimeout time.Duration
TxBroadcastQueueSize int
MaxTxPacketSize int
MaxQueuedTxs int
ShouldRunPprof bool
PprofPort uint
ShouldRunPrometheus bool
PrometheusPort uint
APIPort uint
RPCPort uint
KeyFile string
PrivateKey string
Port int
DiscoveryPort int
RPC string
GenesisHash string
ForkID []byte
DialRatio int
NAT string
TTL time.Duration
DiscoveryDNS string
Database string
NoDiscovery bool
ProxyRPC bool
ProxyRPCTimeout time.Duration
RequestsCache ds.LRUOptions
ParentsCache ds.LRUOptions
BlocksCache ds.LRUOptions
TxsCache ds.LRUOptions
KnownTxsBloom ds.BloomSetOptions
KnownBlocksMax int

bootnodes []*enode.Node
staticNodes []*enode.Node
Expand Down Expand Up @@ -437,18 +438,19 @@ func newDatabase(ctx context.Context) (database.Database, error) {
switch inputSensorParams.Database {
case "datastore":
return database.NewDatastore(ctx, database.DatastoreOptions{
ProjectID: inputSensorParams.ProjectID,
DatabaseID: inputSensorParams.DatabaseID,
SensorID: inputSensorParams.SensorID,
ChainID: inputSensorParams.NetworkID,
MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency,
ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks,
ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents,
ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent,
ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions,
ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents,
ShouldWritePeers: inputSensorParams.ShouldWritePeers,
TTL: inputSensorParams.TTL,
ProjectID: inputSensorParams.ProjectID,
DatabaseID: inputSensorParams.DatabaseID,
SensorID: inputSensorParams.SensorID,
ChainID: inputSensorParams.NetworkID,
MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency,
ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks,
ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents,
ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent,
ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions,
ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents,
ShouldWriteFirstTransactionEvent: inputSensorParams.ShouldWriteFirstTransactionEvent,
ShouldWritePeers: inputSensorParams.ShouldWritePeers,
TTL: inputSensorParams.TTL,
}), nil
case "json":
return database.NewJSONDatabase(database.JSONDatabaseOptions{
Expand Down Expand Up @@ -489,6 +491,8 @@ will result in less chance of missing data but can significantly increase memory
`write transactions to database (this option can significantly increase CPU and memory usage)`)
f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true,
`write transaction events to database (this option can significantly increase CPU and memory usage)`)
f.BoolVar(&inputSensorParams.ShouldWriteFirstTransactionEvent, "write-first-tx-event", false,
"write one transaction event on first-seen only (requires --write-tx-events=false)")
f.BoolVar(&inputSensorParams.ShouldWritePeers, "write-peers", true, "write peers to database")
f.BoolVar(&inputSensorParams.ShouldBroadcastTx, "broadcast-txs", false, "broadcast full transactions to peers")
f.BoolVar(&inputSensorParams.ShouldBroadcastTxHashes, "broadcast-tx-hashes", false, "broadcast transaction hashes to peers")
Expand Down
1 change: 1 addition & 0 deletions doc/polycli_p2p_sensor.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ polycli p2p sensor amoy-nodes.json \
--write-block-events write block events to database (default true)
-B, --write-blocks write blocks to database (default true)
--write-first-block-event write one block event on first-seen only (requires --write-block-events=false)
--write-first-tx-event write one transaction event on first-seen only (requires --write-tx-events=false)
--write-peers write peers to database (default true)
--write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true)
-t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true)
Expand Down
1 change: 1 addition & 0 deletions p2p/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Database interface {
ShouldWriteFirstBlockEvent() bool
ShouldWriteTransactions() bool
ShouldWriteTransactionEvents() bool
ShouldWriteFirstTransactionEvent() bool
ShouldWritePeers() bool

// NodeList will return a list of enode URLs.
Expand Down
169 changes: 98 additions & 71 deletions p2p/database/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,19 @@ const (
// Datastore wraps the datastore client, stores the sensorID, and other
// information needed when writing blocks and transactions.
type Datastore struct {
client *datastore.Client
sensorID string
chainID *big.Int
maxConcurrency int
shouldWriteBlocks bool
shouldWriteBlockEvents bool
shouldWriteFirstBlockEvent bool
shouldWriteTransactions bool
shouldWriteTransactionEvents bool
shouldWritePeers bool
jobs chan struct{}
ttl time.Duration
client *datastore.Client
sensorID string
chainID *big.Int
maxConcurrency int
shouldWriteBlocks bool
shouldWriteBlockEvents bool
shouldWriteFirstBlockEvent bool
shouldWriteTransactions bool
shouldWriteTransactionEvents bool
shouldWriteFirstTransactionEvent bool
shouldWritePeers bool
ttl time.Duration
jobs chan struct{}
}

// DatastoreEvent can represent a peer sending the sensor a transaction hash or
Expand Down Expand Up @@ -94,20 +95,21 @@ type DatastoreBlock struct {
// not indexed because there is a max sized for indexed byte slices, which Data
// will occasionally exceed.
type DatastoreTransaction struct {
Data []byte `datastore:",noindex"`
From string
Gas string
GasFeeCap string
GasPrice string
GasTipCap string
Nonce string
To string
Value string
V, R, S string `datastore:",noindex"`
Time time.Time
TimeFirstSeen time.Time
TTL time.Time
Type int16
Data []byte `datastore:",noindex"`
From string
Gas string `datastore:",noindex"`
GasFeeCap string `datastore:",noindex"`
GasPrice string `datastore:",noindex"`
GasTipCap string `datastore:",noindex"`
Nonce string `datastore:",noindex"`
To string
Value string `datastore:",noindex"`
V, R, S string `datastore:",noindex"`
Time time.Time
TimeFirstSeen time.Time
TTL time.Time
Type int16
SensorFirstSeen string
}

type DatastorePeer struct {
Expand All @@ -121,18 +123,19 @@ type DatastorePeer struct {

// DatastoreOptions is used when creating a NewDatastore.
type DatastoreOptions struct {
ProjectID string
DatabaseID string
SensorID string
ChainID uint64
MaxConcurrency int
ShouldWriteBlocks bool
ShouldWriteBlockEvents bool
ShouldWriteFirstBlockEvent bool
ShouldWriteTransactions bool
ShouldWriteTransactionEvents bool
ShouldWritePeers bool
TTL time.Duration
ProjectID string
DatabaseID string
SensorID string
ChainID uint64
MaxConcurrency int
ShouldWriteBlocks bool
ShouldWriteBlockEvents bool
ShouldWriteFirstBlockEvent bool
ShouldWriteTransactions bool
ShouldWriteTransactionEvents bool
ShouldWriteFirstTransactionEvent bool
ShouldWritePeers bool
TTL time.Duration
}

// NewDatastore connects to datastore and creates the client. This should
Expand All @@ -144,18 +147,19 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {
}

return &Datastore{
client: client,
sensorID: opts.SensorID,
chainID: new(big.Int).SetUint64(opts.ChainID),
maxConcurrency: opts.MaxConcurrency,
shouldWriteBlocks: opts.ShouldWriteBlocks,
shouldWriteBlockEvents: opts.ShouldWriteBlockEvents,
shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent,
shouldWriteTransactions: opts.ShouldWriteTransactions,
shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents,
shouldWritePeers: opts.ShouldWritePeers,
jobs: make(chan struct{}, opts.MaxConcurrency),
ttl: opts.TTL,
client: client,
sensorID: opts.SensorID,
chainID: new(big.Int).SetUint64(opts.ChainID),
maxConcurrency: opts.MaxConcurrency,
shouldWriteBlocks: opts.ShouldWriteBlocks,
shouldWriteBlockEvents: opts.ShouldWriteBlockEvents,
shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent,
shouldWriteTransactions: opts.ShouldWriteTransactions,
shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents,
shouldWriteFirstTransactionEvent: opts.ShouldWriteFirstTransactionEvent,
shouldWritePeers: opts.ShouldWritePeers,
jobs: make(chan struct{}, opts.MaxConcurrency),
ttl: opts.TTL,
}
}

Expand Down Expand Up @@ -432,22 +436,23 @@ func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, tfs time.Time
}

return &DatastoreTransaction{
Data: tx.Data(),
From: from,
Gas: fmt.Sprint(tx.Gas()),
GasFeeCap: tx.GasFeeCap().String(),
GasPrice: tx.GasPrice().String(),
GasTipCap: tx.GasTipCap().String(),
Nonce: fmt.Sprint(tx.Nonce()),
To: to,
Value: tx.Value().String(),
V: v.String(),
R: r.String(),
S: s.String(),
Time: tx.Time(),
TimeFirstSeen: tfs,
TTL: tfs.Add(d.ttl),
Type: int16(tx.Type()),
Data: tx.Data(),
From: from,
Gas: fmt.Sprint(tx.Gas()),
GasFeeCap: tx.GasFeeCap().String(),
GasPrice: tx.GasPrice().String(),
GasTipCap: tx.GasTipCap().String(),
Nonce: fmt.Sprint(tx.Nonce()),
To: to,
Value: tx.Value().String(),
V: v.String(),
R: r.String(),
S: s.String(),
Time: tx.Time(),
TimeFirstSeen: tfs,
TTL: tfs.Add(d.ttl),
Type: int16(tx.Type()),
SensorFirstSeen: d.sensorID,
}
}

Expand Down Expand Up @@ -639,18 +644,36 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
}
}

// writeTransactions will write the transactions to datastore and return the
// transaction hashes.
// writeTransactions will write the transactions to datastore, skipping
// transactions that already exist with an earlier or equal TimeFirstSeen.
func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transaction, tfs time.Time) {
keys := make([]*datastore.Key, 0, len(txs))
transactions := make([]*DatastoreTransaction, 0, len(txs))

for _, tx := range txs {
keys = append(keys, datastore.NameKey(TransactionsKind, tx.Hash().Hex(), nil))
}

// Fetch existing transactions to check TimeFirstSeen
existing := make([]*DatastoreTransaction, len(keys))
_ = d.client.GetMulti(ctx, keys, existing) // Ignore errors - missing keys return nil

transactions := make([]*DatastoreTransaction, 0, len(txs))
keysToWrite := make([]*datastore.Key, 0, len(txs))

for i, tx := range txs {
// Skip if existing record has an earlier or equal TimeFirstSeen
if existing[i] != nil && !existing[i].TimeFirstSeen.IsZero() && !tfs.Before(existing[i].TimeFirstSeen) {
continue
}

keysToWrite = append(keysToWrite, keys[i])
transactions = append(transactions, d.newDatastoreTransaction(tx, tfs))
}

if _, err := d.client.PutMulti(ctx, keys, transactions); err != nil {
if len(keysToWrite) == 0 {
return
}

if _, err := d.client.PutMulti(ctx, keysToWrite, transactions); err != nil {
log.Error().Err(err).Msg("Failed to write transactions")
}
}
Expand All @@ -659,6 +682,10 @@ func (d *Datastore) ShouldWriteFirstBlockEvent() bool {
return d.shouldWriteFirstBlockEvent
}

func (d *Datastore) ShouldWriteFirstTransactionEvent() bool {
return d.shouldWriteFirstTransactionEvent
}

func (d *Datastore) NodeList(ctx context.Context, limit int) ([]string, error) {
query := datastore.NewQuery(BlockEventsKind).Order("-Time")
iter := d.client.Run(ctx, query)
Expand Down
Loading
Loading