Skip to content

Commit 4cad2fc

Browse files
committed
Store only blocks with logs and checkpoints
1 parent 89af143 commit 4cad2fc

4 files changed

Lines changed: 265 additions & 32 deletions

File tree

pkg/logpoller/log_poller.go

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,7 +1070,7 @@ func (lp *logPoller) handleReorg(ctx context.Context, currentBlock *evmtypes.Hea
10701070
// There can be another reorg while we're finding the LCA.
10711071
// That is ok, since we'll detect it on the next iteration.
10721072
// Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1.
1073-
blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, latestBlock.FinalizedBlockNumber)
1073+
blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock.Number, latestBlock.FinalizedBlockNumber)
10741074
if err2 != nil {
10751075
return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2)
10761076
}
@@ -1220,10 +1220,15 @@ func (e *reorgError) Error() string {
12201220
return fmt.Sprintf("reorg detected at block %d", e.ReorgedAt.Number)
12211221
}
12221222

1223-
func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) ([]Block, []Log, error) {
1223+
func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) (blocks []Block, logs []Log, err error) {
12241224
const maxUnfinalizedBlocks = 2000
1225-
var logs []Log
1226-
var blocks []Block
1225+
var block *Block
1226+
defer func() {
1227+
// ensure that we always include the last block even if it's empty to use it as check point for next poll.
1228+
if block != nil && (len(blocks) == 0 || blocks[len(blocks)-1].BlockNumber != block.BlockNumber) {
1229+
blocks = append(blocks, *block)
1230+
}
1231+
}()
12271232
for {
12281233
h := currentBlock.Hash
12291234
rpcLogs, err := lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h))
@@ -1232,15 +1237,18 @@ func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmty
12321237
return blocks, logs, nil
12331238
}
12341239
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlock.Number, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp)
1235-
block := Block{
1240+
block = &Block{
12361241
BlockHash: h,
12371242
BlockNumber: currentBlock.Number,
12381243
BlockTimestamp: currentBlock.Timestamp,
12391244
FinalizedBlockNumber: finalized,
12401245
SafeBlockNumber: safe,
12411246
}
1242-
logs = append(logs, convertLogs(rpcLogs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID())...)
1243-
blocks = append(blocks, block)
1247+
logs = append(logs, convertLogs(rpcLogs, []Block{*block}, lp.lggr, lp.ec.ConfiguredChainID())...)
1248+
// Always save the block with logs, to know an impact of finality violation and for better observability.
1249+
if len(rpcLogs) > 0 {
1250+
blocks = append(blocks, *block)
1251+
}
12441252

12451253
if currentBlock.Number >= latest {
12461254
return blocks, logs, nil
@@ -1317,39 +1325,50 @@ func (lp *logPoller) latestSafeBlock(ctx context.Context, latestFinalizedBlockNu
13171325

13181326
// Find the first place where our chain and their chain have the same block,
13191327
// that block number is the LCA. Return the block after that, where we want to resume polling.
1320-
func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.Head, latestFinalizedBlockNumber int64) (*evmtypes.Head, error) {
1321-
// Current is where the mismatch starts.
1322-
// Check its parent to see if its the same as ours saved.
1323-
parent, err := lp.latencyMonitor.HeadByHash(ctx, current.ParentHash)
1324-
if err != nil {
1325-
return nil, err
1328+
func (lp *logPoller) findBlockAfterLCA(ctx context.Context, currentHeadNumber int64, dbLatestFinalizedBlockNumber int64) (*evmtypes.Head, error) {
1329+
if currentHeadNumber < dbLatestFinalizedBlockNumber {
1330+
lp.lggr.Criticalw("Unexpected state. Current head number is lower than latest finalized block number", "currentHeadNumber", currentHeadNumber, "dbLatestFinalizedBlockNumber", dbLatestFinalizedBlockNumber)
1331+
return nil, fmt.Errorf("current head number %d is lower than latest finalized block number %d: %w", currentHeadNumber, dbLatestFinalizedBlockNumber, commontypes.ErrFinalityViolated)
13261332
}
1327-
blockAfterLCA := current
1333+
13281334
// We expect reorgs up to the block after latestFinalizedBlock
1329-
// We loop via parent instead of current so current always holds the LCA+1.
13301335
// If the parent block number becomes < the first finalized block our reorg is too deep.
1331-
// This can happen only if finalityTag is not enabled and fixed finalityDepth is provided via config.
1332-
var ourParentBlockHash common.Hash
1333-
for parent.Number >= latestFinalizedBlockNumber {
1334-
outParentBlock, err := lp.orm.SelectBlockByNumber(ctx, parent.Number)
1336+
// This can happen only if finalityTag is not enabled and fixed finalityDepth is provided via config or chain violates finality guarantees.
1337+
for {
1338+
// Since we do not store all blocks in the db, it's possible that we do not have the parent block in our db.
1339+
// Find the nearest ancestor that we have in our db and check if it still belongs to canonical chain.
1340+
ourParent, err := lp.orm.SelectNewestBlock(ctx, currentHeadNumber-1)
13351341
if err != nil {
1336-
return nil, err
1337-
}
1338-
ourParentBlockHash = outParentBlock.BlockHash
1339-
if parent.Hash == ourParentBlockHash {
1340-
// If we do have the blockhash, return blockAfterLCA
1341-
return blockAfterLCA, nil
1342+
if errors.Is(err, sql.ErrNoRows) {
1343+
lp.lggr.Warnw("No ancestor block found in db, this means that the reorg is deeper than the number of blocks we have in the db.", "err", err, "currentHeadNumber", currentHeadNumber, "dbLatestFinalizedBlockNumber", dbLatestFinalizedBlockNumber)
1344+
// we should return currentHeadNumber as the block after LCA, to avoid drifting too far back.
1345+
return lp.headerByNumber(ctx, currentHeadNumber)
1346+
}
1347+
return nil, fmt.Errorf("failed to select ancestor for current block %d: %w", currentHeadNumber-1, err)
13421348
}
1343-
// Otherwise get a new parent and update blockAfterLCA.
1344-
blockAfterLCA = parent
1345-
parent, err = lp.latencyMonitor.HeadByHash(ctx, parent.ParentHash)
1349+
1350+
// Since we are looking for block after LCA, fetch child of ourParent.
1351+
// If new current points to ourParent, we found the LCA and can return block after it. Otherwise, keep looking for ancestors.
1352+
rpcChild, err := lp.headerByNumber(ctx, ourParent.BlockNumber+1)
13461353
if err != nil {
1347-
return nil, err
1354+
return nil, fmt.Errorf("failed to fetch block after ancestor %d: %w", ourParent.BlockNumber+1, err)
1355+
}
1356+
if ourParent.BlockHash == rpcChild.ParentHash {
1357+
return rpcChild, nil
1358+
}
1359+
1360+
if ourParent.BlockNumber <= dbLatestFinalizedBlockNumber {
1361+
lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag,
1362+
"current", rpcChild.Number,
1363+
"latestFinalized", dbLatestFinalizedBlockNumber,
1364+
"ourParentHash", ourParent.BlockHash,
1365+
"expectedParentHash", rpcChild.ParentHash,
1366+
"childHash", rpcChild.Hash)
1367+
return nil, fmt.Errorf("%w: finalized block with hash %s is not parent of canonical block at height %d, with parent hash %s", commontypes.ErrFinalityViolated, ourParent.BlockHash, rpcChild.Number, rpcChild.ParentHash)
13481368
}
1349-
}
13501369

1351-
lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber)
1352-
return nil, fmt.Errorf("%w: finalized block hash %s does not match RPC's %s at height %d", commontypes.ErrFinalityViolated, ourParentBlockHash, blockAfterLCA.Hash, blockAfterLCA.Number)
1370+
currentHeadNumber = ourParent.BlockNumber
1371+
}
13531372
}
13541373

13551374
// PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block.

pkg/logpoller/log_poller_internal_test.go

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,202 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
849849
})
850850
}
851851

852+
func Test_FindBlockAfterLCA(t *testing.T) {
853+
testCases := []struct {
854+
Name string
855+
CurrentBlockNumber int64
856+
DBLatestFinalized int64
857+
DBBlocks []int64
858+
Setup func(t *testing.T, ec *clienttest.Client)
859+
ExpectedError error
860+
ExpectedHead *evmtypes.Head
861+
}{
862+
{
863+
Name: "current head lower than DB finalized",
864+
CurrentBlockNumber: 3,
865+
DBLatestFinalized: 5,
866+
DBBlocks: nil,
867+
Setup: nil,
868+
ExpectedError: commontypes.ErrFinalityViolated,
869+
ExpectedHead: nil,
870+
},
871+
{
872+
Name: "no reorg - chains match on first iteration",
873+
CurrentBlockNumber: 5,
874+
DBLatestFinalized: 3,
875+
DBBlocks: []int64{4},
876+
Setup: func(t *testing.T, ec *clienttest.Client) {
877+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
878+
return newHead(n.Int64()), nil
879+
})
880+
},
881+
ExpectedError: nil,
882+
ExpectedHead: newHead(5),
883+
},
884+
{
885+
Name: "reorg - LCA found after walking back",
886+
CurrentBlockNumber: 5,
887+
DBLatestFinalized: 1,
888+
DBBlocks: []int64{2, 3},
889+
Setup: func(t *testing.T, ec *clienttest.Client) {
890+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
891+
num := n.Int64()
892+
if num == 4 {
893+
return &evmtypes.Head{Number: 4, Hash: common.BigToHash(big.NewInt(4)), ParentHash: common.HexToHash("0xdead")}, nil
894+
}
895+
return newHead(num), nil
896+
}).Maybe()
897+
},
898+
ExpectedError: nil,
899+
ExpectedHead: newHead(3),
900+
},
901+
{
902+
Name: "reorg too deep",
903+
CurrentBlockNumber: 5,
904+
DBLatestFinalized: 2,
905+
DBBlocks: []int64{1},
906+
Setup: func(t *testing.T, ec *clienttest.Client) {
907+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
908+
num := n.Int64()
909+
return &evmtypes.Head{Number: num, Hash: common.BigToHash(big.NewInt(num)), ParentHash: common.HexToHash("0xbeef")}, nil
910+
})
911+
},
912+
ExpectedError: commontypes.ErrFinalityViolated,
913+
ExpectedHead: nil,
914+
},
915+
{
916+
Name: "RPC HeadByNumber returns error",
917+
CurrentBlockNumber: 5,
918+
DBLatestFinalized: 3,
919+
DBBlocks: []int64{4},
920+
Setup: func(t *testing.T, ec *clienttest.Client) {
921+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
922+
if n.Int64() == 5 {
923+
return nil, errors.New("rpc error")
924+
}
925+
return newHead(n.Int64()), nil
926+
})
927+
},
928+
ExpectedError: errors.New("rpc error"),
929+
ExpectedHead: nil,
930+
},
931+
{
932+
Name: "All blocks in DB are on a different chain",
933+
CurrentBlockNumber: 100,
934+
DBLatestFinalized: 10,
935+
DBBlocks: []int64{90, 80, 55, 20},
936+
Setup: func(t *testing.T, ec *clienttest.Client) {
937+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
938+
head := newHead(n.Int64())
939+
if n.Int64() < 20 {
940+
return head, nil
941+
}
942+
head.ParentHash = common.HexToHash("0xdead")
943+
head.Hash = common.HexToHash("0xdead")
944+
return head, nil
945+
})
946+
},
947+
ExpectedError: nil,
948+
ExpectedHead: &evmtypes.Head{
949+
Number: 20,
950+
Hash: common.HexToHash("0xdead"),
951+
ParentHash: common.HexToHash("0xdead"),
952+
},
953+
},
954+
{
955+
Name: "Sparse DB blocks - LCA found successfully",
956+
CurrentBlockNumber: 100,
957+
DBLatestFinalized: 10,
958+
DBBlocks: []int64{90, 80, 55, 20},
959+
Setup: func(t *testing.T, ec *clienttest.Client) {
960+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
961+
head := newHead(n.Int64())
962+
if n.Int64() > 21 {
963+
head.ParentHash = common.HexToHash("0xdead")
964+
return head, nil
965+
}
966+
return head, nil
967+
})
968+
},
969+
ExpectedError: nil,
970+
ExpectedHead: newHead(21),
971+
},
972+
{
973+
Name: "Child of latest finalized is not canonical - finality violation",
974+
CurrentBlockNumber: 100,
975+
DBLatestFinalized: 80,
976+
DBBlocks: []int64{90, 80, 55, 20},
977+
Setup: func(t *testing.T, ec *clienttest.Client) {
978+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
979+
head := newHead(n.Int64())
980+
if n.Int64() > 80 {
981+
head.ParentHash = common.HexToHash("0xdead")
982+
return head, nil
983+
}
984+
return head, nil
985+
})
986+
},
987+
ExpectedError: commontypes.ErrFinalityViolated,
988+
ExpectedHead: nil,
989+
},
990+
{
991+
// Such case is possible, since DBLatestFinalized is defined by FinalizedBlockNumber of the latest block.
992+
Name: "Latest finalized DB block is in canonical but much older than DBLatestFinalized",
993+
CurrentBlockNumber: 100,
994+
DBLatestFinalized: 80,
995+
DBBlocks: []int64{90, 70, 55, 20},
996+
Setup: func(t *testing.T, ec *clienttest.Client) {
997+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
998+
head := newHead(n.Int64())
999+
if n.Int64() > 80 {
1000+
head.ParentHash = common.HexToHash("0xdead")
1001+
return head, nil
1002+
}
1003+
return head, nil
1004+
})
1005+
},
1006+
ExpectedError: nil,
1007+
ExpectedHead: newHead(71),
1008+
},
1009+
}
1010+
for _, tc := range testCases {
1011+
tc := tc
1012+
t.Run(tc.Name, func(t *testing.T) {
1013+
t.Parallel()
1014+
db := testutils.NewSqlxDB(t)
1015+
lggr, _ := logger.TestObserved(t, zapcore.DebugLevel)
1016+
orm := NewORM(testutils.NewRandomEVMChainID(), db, lggr)
1017+
headTracker := headstest.NewTracker[*evmtypes.Head, common.Hash](t)
1018+
ec := clienttest.NewClient(t)
1019+
ctx := testutils.Context(t)
1020+
for _, blockNum := range tc.DBBlocks {
1021+
hash := common.BigToHash(big.NewInt(blockNum))
1022+
require.NoError(t, orm.InsertBlock(ctx, hash, blockNum, time.Now(), blockNum, blockNum))
1023+
}
1024+
if tc.Setup != nil {
1025+
tc.Setup(t, ec)
1026+
}
1027+
lpOpts := Opts{
1028+
PollPeriod: time.Second,
1029+
FinalityDepth: 3,
1030+
BackfillBatchSize: 3,
1031+
RPCBatchSize: 3,
1032+
KeepFinalizedBlocksDepth: 20,
1033+
BackupPollerBlockDelay: 0,
1034+
}
1035+
lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
1036+
blockAfterLCA, err := lp.findBlockAfterLCA(ctx, tc.CurrentBlockNumber, tc.DBLatestFinalized)
1037+
if tc.ExpectedError != nil {
1038+
require.ErrorContains(t, err, tc.ExpectedError.Error())
1039+
require.Nil(t, blockAfterLCA)
1040+
} else {
1041+
require.NoError(t, err)
1042+
require.Equal(t, tc.ExpectedHead, blockAfterLCA)
1043+
}
1044+
})
1045+
}
1046+
}
1047+
8521048
func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) {
8531049
lggr := logger.Test(b)
8541050
lpOpts := Opts{

pkg/logpoller/observability.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,12 @@ func (o *ObservedORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumb
138138
})
139139
}
140140

141+
func (o *ObservedORM) SelectNewestBlock(ctx context.Context, maxAllowedBlockNumber int64) (*Block, error) {
142+
return withObservedQuery(ctx, o, "SelectNewestBlock", func() (*Block, error) {
143+
return o.ORM.SelectNewestBlock(ctx, maxAllowedBlockNumber)
144+
})
145+
}
146+
141147
func (o *ObservedORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error) {
142148
return withObservedQuery(ctx, o, "SelectLatestLogByEventSigWithConfs", func() (*Log, error) {
143149
return o.ORM.SelectLatestLogByEventSigWithConfs(ctx, eventSig, address, confs)

pkg/logpoller/orm.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type ORM interface {
4444
SelectBlockByHash(ctx context.Context, hash common.Hash) (*Block, error)
4545
SelectLatestBlock(ctx context.Context) (*Block, error)
4646
SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*Block, error)
47+
SelectNewestBlock(ctx context.Context, maxAllowedBlockNumber int64) (*Block, error)
4748
SelectLatestFinalizedBlock(ctx context.Context) (*Block, error)
4849

4950
SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error)
@@ -296,6 +297,17 @@ func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int
296297
return &b, nil
297298
}
298299

300+
func (o *DSORM) SelectNewestBlock(ctx context.Context, maxAllowedBlockNumber int64) (*Block, error) {
301+
var b Block
302+
if err := o.ds.GetContext(ctx, &b,
303+
blocksQuery(`WHERE evm_chain_id = $1 AND block_number <= $2 ORDER BY block_number DESC LIMIT 1`),
304+
ubig.New(o.chainID), maxAllowedBlockNumber,
305+
); err != nil {
306+
return nil, err
307+
}
308+
return &b, nil
309+
}
310+
299311
func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error) {
300312
args, err := newQueryArgsForEvent(o.chainID, address, eventSig).
301313
withConfs(confs).

0 commit comments

Comments
 (0)