Skip to content

Commit 63ad281

Browse files
authored
feat(sensor): --write-first-block-event flag to reduce Datastore writes (#867)
* feat(sensor): time first seen by sensor * chore: improve mergeSensorTimes * fix: simplify logic * fix: use slice instead of map * feat: --write-first-block-event flag * chore: go fmt * fix: remove redundant branch in writeFirstSeen and update shouldWrite to modified * fix: flag logic * fix: mutual exclusive flag * docs: make gen * fix: missing if check bug * feat: add sensor ID to API * feat: fix race conditions * fix: comment
1 parent 4dee897 commit 63ad281

File tree

13 files changed

+109
-64
lines changed

13 files changed

+109
-64
lines changed

cmd/p2p/sensor/api.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func newBlockInfo(header *types.Header) *blockInfo {
4747

4848
// apiData represents all sensor information including node info and peer data.
4949
type apiData struct {
50+
SensorID string `json:"sensor_id"`
5051
ENR string `json:"enr"`
5152
URL string `json:"enode"`
5253
PeerCount int `json:"peer_count"`
@@ -105,6 +106,7 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) {
105106
}
106107

107108
data := apiData{
109+
SensorID: inputSensorParams.SensorID,
108110
ENR: server.NodeInfo().ENR,
109111
URL: server.Self().URLv4(),
110112
PeerCount: len(peers),
@@ -123,4 +125,3 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) {
123125
log.Error().Err(err).Msg("Failed to start API handler")
124126
}
125127
}
126-

cmd/p2p/sensor/sensor.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ import (
2929
"github.com/rs/zerolog/log"
3030
"github.com/spf13/cobra"
3131

32-
ds "github.com/0xPolygon/polygon-cli/p2p/datastructures"
3332
"github.com/0xPolygon/polygon-cli/flag"
3433
"github.com/0xPolygon/polygon-cli/p2p"
3534
"github.com/0xPolygon/polygon-cli/p2p/database"
35+
ds "github.com/0xPolygon/polygon-cli/p2p/datastructures"
3636
"github.com/0xPolygon/polygon-cli/rpctypes"
3737
)
3838

@@ -50,6 +50,7 @@ type (
5050
MaxDatabaseConcurrency int
5151
ShouldWriteBlocks bool
5252
ShouldWriteBlockEvents bool
53+
ShouldWriteFirstBlockEvent bool
5354
ShouldWriteTransactions bool
5455
ShouldWriteTransactionEvents bool
5556
ShouldWritePeers bool
@@ -438,6 +439,7 @@ func newDatabase(ctx context.Context) (database.Database, error) {
438439
MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency,
439440
ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks,
440441
ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents,
442+
ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent,
441443
ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions,
442444
ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents,
443445
ShouldWritePeers: inputSensorParams.ShouldWritePeers,
@@ -476,6 +478,8 @@ func init() {
476478
will result in less chance of missing data but can significantly increase memory usage)`)
477479
f.BoolVarP(&inputSensorParams.ShouldWriteBlocks, "write-blocks", "B", true, "write blocks to database")
478480
f.BoolVar(&inputSensorParams.ShouldWriteBlockEvents, "write-block-events", true, "write block events to database")
481+
f.BoolVar(&inputSensorParams.ShouldWriteFirstBlockEvent, "write-first-block-event", false,
482+
"write one block event on first-seen only (requires --write-block-events=false)")
479483
f.BoolVarP(&inputSensorParams.ShouldWriteTransactions, "write-txs", "t", true,
480484
`write transactions to database (this option can significantly increase CPU and memory usage)`)
481485
f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true,

cmd/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ import (
2929
"github.com/0xPolygon/polygon-cli/cmd/p2p"
3030
"github.com/0xPolygon/polygon-cli/cmd/parsebatchl2data"
3131
"github.com/0xPolygon/polygon-cli/cmd/parseethwallet"
32+
"github.com/0xPolygon/polygon-cli/cmd/plot"
3233
"github.com/0xPolygon/polygon-cli/cmd/publish"
3334
"github.com/0xPolygon/polygon-cli/cmd/report"
3435
"github.com/0xPolygon/polygon-cli/cmd/retest"
3536
"github.com/0xPolygon/polygon-cli/cmd/rpcfuzz"
3637
"github.com/0xPolygon/polygon-cli/cmd/signer"
37-
"github.com/0xPolygon/polygon-cli/cmd/plot"
3838
"github.com/0xPolygon/polygon-cli/cmd/ulxly"
3939
"github.com/0xPolygon/polygon-cli/cmd/version"
4040
"github.com/0xPolygon/polygon-cli/cmd/wallet"

doc/polycli_p2p_sensor.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ polycli p2p sensor amoy-nodes.json \
142142
--txs-cache-ttl duration time to live for transaction cache entries (0 for no expiration) (default 10m0s)
143143
--write-block-events write block events to database (default true)
144144
-B, --write-blocks write blocks to database (default true)
145+
--write-first-block-event write one block event on first-seen only (requires --write-block-events=false)
145146
--write-peers write peers to database (default true)
146147
--write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true)
147148
-t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true)

loadtest/gasmanager/wave.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ func (w *wave) MoveNext() {
7070
}
7171
}
7272

73-
func (w *wave) Y() float64 { return w.points[w.x] }
74-
func (w *wave) X() float64 { return w.x }
75-
func (w *wave) Period() uint64 { return w.config.Period }
73+
func (w *wave) Y() float64 { return w.points[w.x] }
74+
func (w *wave) X() float64 { return w.x }
75+
func (w *wave) Period() uint64 { return w.config.Period }
7676
func (w *wave) Amplitude() uint64 { return w.config.Amplitude }
77-
func (w *wave) Target() uint64 { return w.config.Target }
77+
func (w *wave) Target() uint64 { return w.config.Target }
7878

7979
// Wave computation functions
8080

p2p/database/database.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Database interface {
3030

3131
// WriteBlockHashFirstSeen writes a partial block entry with just the hash
3232
// first seen time if the block doesn't exist yet.
33-
WriteBlockHashFirstSeen(context.Context, common.Hash, time.Time)
33+
WriteBlockHashFirstSeen(context.Context, *enode.Node, common.Hash, time.Time)
3434

3535
// WriteBlockBody will write the block bodies if ShouldWriteBlocks returns
3636
// true.
@@ -51,6 +51,7 @@ type Database interface {
5151
MaxConcurrentWrites() int
5252
ShouldWriteBlocks() bool
5353
ShouldWriteBlockEvents() bool
54+
ShouldWriteFirstBlockEvent() bool
5455
ShouldWriteTransactions() bool
5556
ShouldWriteTransactionEvents() bool
5657
ShouldWritePeers() bool

p2p/database/datastore.go

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type Datastore struct {
3535
maxConcurrency int
3636
shouldWriteBlocks bool
3737
shouldWriteBlockEvents bool
38+
shouldWriteFirstBlockEvent bool
3839
shouldWriteTransactions bool
3940
shouldWriteTransactionEvents bool
4041
shouldWritePeers bool
@@ -127,6 +128,7 @@ type DatastoreOptions struct {
127128
MaxConcurrency int
128129
ShouldWriteBlocks bool
129130
ShouldWriteBlockEvents bool
131+
ShouldWriteFirstBlockEvent bool
130132
ShouldWriteTransactions bool
131133
ShouldWriteTransactionEvents bool
132134
ShouldWritePeers bool
@@ -148,6 +150,7 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {
148150
maxConcurrency: opts.MaxConcurrency,
149151
shouldWriteBlocks: opts.ShouldWriteBlocks,
150152
shouldWriteBlockEvents: opts.ShouldWriteBlockEvents,
153+
shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent,
151154
shouldWriteTransactions: opts.ShouldWriteTransactions,
152155
shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents,
153156
shouldWritePeers: opts.ShouldWritePeers,
@@ -231,18 +234,28 @@ func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hash
231234
// WriteBlockHashFirstSeen writes a partial block entry with just the hash
232235
// first seen time if the block doesn't exist yet. If it exists, updates the
233236
// TimeFirstSeenHash if the new time is earlier.
234-
func (d *Datastore) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) {
235-
if d.client == nil || !d.ShouldWriteBlocks() {
237+
func (d *Datastore) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) {
238+
if d.client == nil || (!d.ShouldWriteBlocks() && !d.shouldWriteFirstBlockEvent) {
236239
return
237240
}
238241

239242
d.runAsync(func() {
240-
d.writeBlockHashFirstSeen(ctx, hash, tfsh)
243+
d.writeBlockHashFirstSeen(ctx, peer, hash, tfsh)
241244
})
242245
}
243246

244247
// writeBlockHashFirstSeen performs the actual transaction to write or update the block hash first seen time.
245-
func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) {
248+
func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) {
249+
// Write block event if flag enabled and block events are disabled (mutually exclusive).
250+
// Cache check in protocol.go already verified first-seen.
251+
if d.shouldWriteFirstBlockEvent && !d.ShouldWriteBlockEvents() && peer != nil {
252+
d.writeEvent(peer, BlockEventsKind, hash, BlocksKind, tfsh)
253+
}
254+
255+
if !d.shouldWriteBlocks {
256+
return
257+
}
258+
246259
key := datastore.NameKey(BlocksKind, hash.Hex(), nil)
247260

248261
_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
@@ -390,17 +403,8 @@ func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isPa
390403
}
391404
}
392405

393-
// writeFirstSeen updates timing fields on a header and block, preserving earlier timestamps.
394-
func (d *Datastore) writeFirstSeen(header *DatastoreHeader, block *DatastoreBlock, tfs time.Time) {
395-
// Preserve earlier header timing if it exists
396-
if block.DatastoreHeader != nil &&
397-
!block.TimeFirstSeen.IsZero() &&
398-
block.TimeFirstSeen.Before(tfs) {
399-
header.TimeFirstSeen = block.TimeFirstSeen
400-
header.SensorFirstSeen = block.SensorFirstSeen
401-
}
402-
403-
// Set hash timing if it doesn't exist or if new timestamp is earlier
406+
// writeFirstSeen sets hash timing if it doesn't exist or if new timestamp is earlier.
407+
func (d *Datastore) writeFirstSeen(block *DatastoreBlock, tfs time.Time) {
404408
if block.TimeFirstSeenHash.IsZero() || tfs.Before(block.TimeFirstSeenHash) {
405409
block.TimeFirstSeenHash = tfs
406410
block.SensorFirstSeenHash = d.sensorID
@@ -456,27 +460,27 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
456460
// are nil we will just set them.
457461
_ = tx.Get(key, &dsBlock)
458462

459-
shouldWrite := false
463+
modified := false
460464

461465
if dsBlock.DatastoreHeader == nil || tfs.Before(dsBlock.TimeFirstSeen) {
462-
shouldWrite = true
466+
modified = true
463467

464468
// Create new header with current timing
465469
header := d.newDatastoreHeader(block.Header(), tfs, false)
466470

467-
// Preserve earlier timestamps from any earlier announcement
468-
d.writeFirstSeen(header, &dsBlock, tfs)
471+
// Preserve earliest first-seen timestamp
472+
d.writeFirstSeen(&dsBlock, tfs)
469473

470474
dsBlock.DatastoreHeader = header
471475
}
472476

473477
if len(dsBlock.TotalDifficulty) == 0 {
474-
shouldWrite = true
478+
modified = true
475479
dsBlock.TotalDifficulty = td.String()
476480
}
477481

478482
if dsBlock.Transactions == nil && len(block.Transactions()) > 0 {
479-
shouldWrite = true
483+
modified = true
480484
if d.shouldWriteTransactions {
481485
d.writeTransactions(ctx, block.Transactions(), tfs)
482486
}
@@ -488,15 +492,15 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
488492
}
489493

490494
if dsBlock.Uncles == nil && len(block.Uncles()) > 0 {
491-
shouldWrite = true
495+
modified = true
492496
dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles()))
493497
for _, uncle := range block.Uncles() {
494498
d.writeBlockHeader(ctx, uncle, tfs, false)
495499
dsBlock.Uncles = append(dsBlock.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
496500
}
497501
}
498502

499-
if shouldWrite {
503+
if modified {
500504
_, err := tx.Put(key, &dsBlock)
501505
return err
502506
}
@@ -568,8 +572,8 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header,
568572
// Create new header with current timing
569573
newHeader := d.newDatastoreHeader(header, tfs, isParent)
570574

571-
// Preserve earlier timestamps from any earlier announcement or full block
572-
d.writeFirstSeen(newHeader, &block, tfs)
575+
// Preserve earliest first-seen timestamp
576+
d.writeFirstSeen(&block, tfs)
573577

574578
block.DatastoreHeader = newHeader
575579
_, err = tx.Put(key, &block)
@@ -590,10 +594,10 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
590594
log.Debug().Err(err).Str("hash", hash.Hex()).Msg("Failed to fetch block when writing block body")
591595
}
592596

593-
shouldWrite := false
597+
modified := false
594598

595599
if block.Transactions == nil && len(body.Transactions) > 0 {
596-
shouldWrite = true
600+
modified = true
597601
if d.shouldWriteTransactions {
598602
d.writeTransactions(ctx, body.Transactions, tfs)
599603
}
@@ -605,15 +609,15 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
605609
}
606610

607611
if block.Uncles == nil && len(body.Uncles) > 0 {
608-
shouldWrite = true
612+
modified = true
609613
block.Uncles = make([]*datastore.Key, 0, len(body.Uncles))
610614
for _, uncle := range body.Uncles {
611615
d.writeBlockHeader(ctx, uncle, tfs, false)
612616
block.Uncles = append(block.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
613617
}
614618
}
615619

616-
if shouldWrite {
620+
if modified {
617621
_, err := tx.Put(key, &block)
618622
return err
619623
}
@@ -642,6 +646,10 @@ func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transact
642646
}
643647
}
644648

649+
func (d *Datastore) ShouldWriteFirstBlockEvent() bool {
650+
return d.shouldWriteFirstBlockEvent
651+
}
652+
645653
func (d *Datastore) NodeList(ctx context.Context, limit int) ([]string, error) {
646654
query := datastore.NewQuery(BlockEventsKind).Order("-Time")
647655
iter := d.client.Run(ctx, query)

p2p/database/json.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (j *JSONDatabase) WriteBlockHashes(ctx context.Context, peer *enode.Node, h
238238

239239
// WriteBlockHashFirstSeen writes a partial block entry with just the hash
240240
// first seen time. For JSON output, this writes a separate record type.
241-
func (j *JSONDatabase) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) {
241+
func (j *JSONDatabase) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) {
242242
if !j.ShouldWriteBlocks() {
243243
return
244244
}
@@ -374,6 +374,11 @@ func (j *JSONDatabase) ShouldWriteBlockEvents() bool {
374374
return j.shouldWriteBlockEvents
375375
}
376376

377+
// ShouldWriteFirstBlockEvent returns false for JSON database.
378+
func (j *JSONDatabase) ShouldWriteFirstBlockEvent() bool {
379+
return false
380+
}
381+
377382
// ShouldWriteTransactions returns the configured value.
378383
func (j *JSONDatabase) ShouldWriteTransactions() bool {
379384
return j.shouldWriteTransactions

p2p/database/nodb.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (n *nodb) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes []
3434
}
3535

3636
// WriteBlockHashFirstSeen does nothing.
37-
func (n *nodb) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) {
37+
func (n *nodb) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) {
3838
}
3939

4040
// WriteBlockBody does nothing.
@@ -69,6 +69,11 @@ func (n *nodb) ShouldWriteBlockEvents() bool {
6969
return false
7070
}
7171

72+
// ShouldWriteFirstBlockEvent returns false.
73+
func (n *nodb) ShouldWriteFirstBlockEvent() bool {
74+
return false
75+
}
76+
7277
// ShouldWriteTransactions returns false.
7378
func (n *nodb) ShouldWriteTransactions() bool {
7479
return false

p2p/datastructures/bloomset_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,4 +200,3 @@ func BenchmarkBloomSetFilterNotContained(b *testing.B) {
200200
bloom.FilterNotContained(batch)
201201
}
202202
}
203-

0 commit comments

Comments
 (0)