@@ -23,7 +23,7 @@ const (
2323 TransactionsKind = "transactions"
2424 TransactionEventsKind = "transaction_events"
2525 PeersKind = "peers"
26- MaxAttempts = 3
26+ MaxAttempts = 5
2727)
2828
2929// Datastore wraps the datastore client, stores the sensorID, and other
@@ -57,33 +57,36 @@ type DatastoreEvent struct {
5757// DatastoreHeader stores the data in manner that can be easily written without
5858// loss of precision.
5959type DatastoreHeader struct {
60- ParentHash * datastore.Key
61- UncleHash string `datastore:",noindex"`
62- Coinbase string `datastore:",noindex"`
63- Root string `datastore:",noindex"`
64- TxHash string `datastore:",noindex"`
65- ReceiptHash string `datastore:",noindex"`
66- Bloom []byte `datastore:",noindex"`
67- Difficulty string `datastore:",noindex"`
68- Number string
69- GasLimit string `datastore:",noindex"`
70- GasUsed string
71- Time time.Time
72- Extra []byte `datastore:",noindex"`
73- MixDigest string `datastore:",noindex"`
74- Nonce string `datastore:",noindex"`
75- BaseFee string `datastore:",noindex"`
76- TimeFirstSeen time.Time
77- TTL time.Time
78- IsParent bool
60+ ParentHash * datastore.Key
61+ UncleHash string `datastore:",noindex"`
62+ Coinbase string `datastore:",noindex"`
63+ Root string `datastore:",noindex"`
64+ TxHash string `datastore:",noindex"`
65+ ReceiptHash string `datastore:",noindex"`
66+ Bloom []byte `datastore:",noindex"`
67+ Difficulty string `datastore:",noindex"`
68+ Number string
69+ GasLimit string `datastore:",noindex"`
70+ GasUsed string
71+ Time time.Time
72+ Extra []byte `datastore:",noindex"`
73+ MixDigest string `datastore:",noindex"`
74+ Nonce string `datastore:",noindex"`
75+ BaseFee string `datastore:",noindex"`
76+ TimeFirstSeen time.Time
77+ TTL time.Time
78+ IsParent bool
79+ SensorFirstSeen string
7980}
8081
8182// DatastoreBlock represents a block stored in datastore.
8283type DatastoreBlock struct {
8384 * DatastoreHeader
84- TotalDifficulty string `datastore:",noindex"`
85- Transactions []* datastore.Key `datastore:",noindex"`
86- Uncles []* datastore.Key `datastore:",noindex"`
85+ TotalDifficulty string `datastore:",noindex"`
86+ Transactions []* datastore.Key `datastore:",noindex"`
87+ Uncles []* datastore.Key `datastore:",noindex"`
88+ TimeFirstSeenHash time.Time
89+ SensorFirstSeenHash string
8790}
8891
8992// DatastoreTransaction represents a transaction stored in datastore. Data is
@@ -225,6 +228,52 @@ func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hash
225228 })
226229}
227230
231+ // WriteBlockHashFirstSeen writes a partial block entry with just the hash
232+ // first seen time if the block doesn't exist yet. If it exists, updates the
233+ // TimeFirstSeenHash if the new time is earlier.
234+ func (d * Datastore ) WriteBlockHashFirstSeen (ctx context.Context , hash common.Hash , tfsh time.Time ) {
235+ if d .client == nil || ! d .ShouldWriteBlocks () {
236+ return
237+ }
238+
239+ d .runAsync (func () {
240+ d .writeBlockHashFirstSeen (ctx , hash , tfsh )
241+ })
242+ }
243+
244+ // writeBlockHashFirstSeen performs the actual transaction to write or update the block hash first seen time.
245+ func (d * Datastore ) writeBlockHashFirstSeen (ctx context.Context , hash common.Hash , tfsh time.Time ) {
246+ key := datastore .NameKey (BlocksKind , hash .Hex (), nil )
247+
248+ _ , err := d .client .RunInTransaction (ctx , func (tx * datastore.Transaction ) error {
249+ var block DatastoreBlock
250+ err := tx .Get (key , & block )
251+
252+ // If block doesn't exist, create partial entry with just hash timing
253+ if err != nil {
254+ block .TimeFirstSeenHash = tfsh
255+ block .SensorFirstSeenHash = d .sensorID
256+ _ , err = tx .Put (key , & block )
257+ return err
258+ }
259+
260+ // If timestamp already set and not earlier, no update needed
261+ if ! block .TimeFirstSeenHash .IsZero () && ! tfsh .Before (block .TimeFirstSeenHash ) {
262+ return nil
263+ }
264+
265+ // Update with earlier timestamp
266+ block .TimeFirstSeenHash = tfsh
267+ block .SensorFirstSeenHash = d .sensorID
268+ _ , err = tx .Put (key , & block )
269+ return err
270+ }, datastore .MaxAttempts (MaxAttempts ))
271+
272+ if err != nil {
273+ log .Error ().Err (err ).Str ("hash" , hash .Hex ()).Msg ("Failed to write block hash first seen" )
274+ }
275+ }
276+
228277// WriteTransactions will write the transactions and transaction events to datastore.
229278func (d * Datastore ) WriteTransactions (ctx context.Context , peer * enode.Node , txs []* types.Transaction , tfs time.Time ) {
230279 if d .client == nil {
@@ -318,25 +367,43 @@ func (d *Datastore) HasBlock(ctx context.Context, hash common.Hash) bool {
318367// values are converted into strings to prevent a loss of precision.
319368func (d * Datastore ) newDatastoreHeader (header * types.Header , tfs time.Time , isParent bool ) * DatastoreHeader {
320369 return & DatastoreHeader {
321- ParentHash : datastore .NameKey (BlocksKind , header .ParentHash .Hex (), nil ),
322- UncleHash : header .UncleHash .Hex (),
323- Coinbase : header .Coinbase .Hex (),
324- Root : header .Root .Hex (),
325- TxHash : header .TxHash .Hex (),
326- ReceiptHash : header .ReceiptHash .Hex (),
327- Bloom : header .Bloom .Bytes (),
328- Difficulty : header .Difficulty .String (),
329- Number : header .Number .String (),
330- GasLimit : fmt .Sprint (header .GasLimit ),
331- GasUsed : fmt .Sprint (header .GasUsed ),
332- Time : time .Unix (int64 (header .Time ), 0 ),
333- Extra : header .Extra ,
334- MixDigest : header .MixDigest .String (),
335- Nonce : fmt .Sprint (header .Nonce .Uint64 ()),
336- BaseFee : header .BaseFee .String (),
337- TimeFirstSeen : tfs ,
338- TTL : tfs .Add (d .ttl ),
339- IsParent : isParent ,
370+ ParentHash : datastore .NameKey (BlocksKind , header .ParentHash .Hex (), nil ),
371+ UncleHash : header .UncleHash .Hex (),
372+ Coinbase : header .Coinbase .Hex (),
373+ Root : header .Root .Hex (),
374+ TxHash : header .TxHash .Hex (),
375+ ReceiptHash : header .ReceiptHash .Hex (),
376+ Bloom : header .Bloom .Bytes (),
377+ Difficulty : header .Difficulty .String (),
378+ Number : header .Number .String (),
379+ GasLimit : fmt .Sprint (header .GasLimit ),
380+ GasUsed : fmt .Sprint (header .GasUsed ),
381+ Time : time .Unix (int64 (header .Time ), 0 ),
382+ Extra : header .Extra ,
383+ MixDigest : header .MixDigest .String (),
384+ Nonce : fmt .Sprint (header .Nonce .Uint64 ()),
385+ BaseFee : header .BaseFee .String (),
386+ TimeFirstSeen : tfs ,
387+ TTL : tfs .Add (d .ttl ),
388+ IsParent : isParent ,
389+ SensorFirstSeen : d .sensorID ,
390+ }
391+ }
392+
393+ // writeFirstSeen updates timing fields on a header and block, preserving earlier timestamps.
394+ func (d * Datastore ) writeFirstSeen (header * DatastoreHeader , block * DatastoreBlock , tfs time.Time ) {
395+ // Preserve earlier header timing if it exists
396+ if block .DatastoreHeader != nil &&
397+ ! block .DatastoreHeader .TimeFirstSeen .IsZero () &&
398+ block .DatastoreHeader .TimeFirstSeen .Before (tfs ) {
399+ header .TimeFirstSeen = block .DatastoreHeader .TimeFirstSeen
400+ header .SensorFirstSeen = block .DatastoreHeader .SensorFirstSeen
401+ }
402+
403+ // Set hash timing if it doesn't exist or if new timestamp is earlier
404+ if block .TimeFirstSeenHash .IsZero () || tfs .Before (block .TimeFirstSeenHash ) {
405+ block .TimeFirstSeenHash = tfs
406+ block .SensorFirstSeenHash = d .sensorID
340407 }
341408}
342409
@@ -391,9 +458,16 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
391458
392459 shouldWrite := false
393460
394- if dsBlock .DatastoreHeader == nil {
461+ if dsBlock .DatastoreHeader == nil || tfs . Before ( dsBlock . DatastoreHeader . TimeFirstSeen ) {
395462 shouldWrite = true
396- dsBlock .DatastoreHeader = d .newDatastoreHeader (block .Header (), tfs , false )
463+
464+ // Create new header with current timing
465+ header := d .newDatastoreHeader (block .Header (), tfs , false )
466+
467+ // Preserve earlier timestamps from any earlier announcement
468+ d .writeFirstSeen (header , & dsBlock , tfs )
469+
470+ dsBlock .DatastoreHeader = header
397471 }
398472
399473 if len (dsBlock .TotalDifficulty ) == 0 {
@@ -484,12 +558,21 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header,
484558
485559 _ , err := d .client .RunInTransaction (ctx , func (tx * datastore.Transaction ) error {
486560 var block DatastoreBlock
487- if err := tx .Get (key , & block ); err == nil && block .DatastoreHeader != nil {
561+ err := tx .Get (key , & block )
562+
563+ // If block header already exists and new timestamp is not earlier, don't overwrite
564+ if err == nil && block .DatastoreHeader != nil && ! tfs .Before (block .DatastoreHeader .TimeFirstSeen ) {
488565 return nil
489566 }
490567
491- block .DatastoreHeader = d .newDatastoreHeader (header , tfs , isParent )
492- _ , err := tx .Put (key , & block )
568+ // Create new header with current timing
569+ newHeader := d .newDatastoreHeader (header , tfs , isParent )
570+
571+ // Preserve earlier timestamps from any earlier announcement or full block
572+ d .writeFirstSeen (newHeader , & block , tfs )
573+
574+ block .DatastoreHeader = newHeader
575+ _ , err = tx .Put (key , & block )
493576 return err
494577 }, datastore .MaxAttempts (MaxAttempts ))
495578
0 commit comments