Skip to content

Commit c681f60

Browse files
committed
Store only blocks with logs and checkpoints
1 parent 7667a4d commit c681f60

File tree

4 files changed

+265
-32
lines changed

4 files changed

+265
-32
lines changed

pkg/logpoller/log_poller.go

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,7 +1081,7 @@ func (lp *logPoller) handleReorg(ctx context.Context, currentBlock *evmtypes.Hea
10811081
// There can be another reorg while we're finding the LCA.
10821082
// That is ok, since we'll detect it on the next iteration.
10831083
// Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1.
1084-
blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, latestBlock.FinalizedBlockNumber)
1084+
blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock.Number, latestBlock.FinalizedBlockNumber)
10851085
if err2 != nil {
10861086
return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2)
10871087
}
@@ -1231,10 +1231,15 @@ func (e *reorgError) Error() string {
12311231
return fmt.Sprintf("reorg detected at block %d", e.ReorgedAt.Number)
12321232
}
12331233

1234-
func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) ([]Block, []Log, error) {
1234+
func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) (blocks []Block, logs []Log, err error) {
12351235
const maxUnfinalizedBlocks = 2000
1236-
var logs []Log
1237-
var blocks []Block
1236+
var block *Block
1237+
defer func() {
1238+
// ensure that we always include the last block even if it's empty to use it as check point for next poll.
1239+
if block != nil && (len(blocks) == 0 || blocks[len(blocks)-1].BlockNumber != block.BlockNumber) {
1240+
blocks = append(blocks, *block)
1241+
}
1242+
}()
12381243
for {
12391244
h := currentBlock.Hash
12401245
rpcLogs, err := lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h))
@@ -1243,15 +1248,18 @@ func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmty
12431248
return blocks, logs, nil
12441249
}
12451250
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlock.Number, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp)
1246-
block := Block{
1251+
block = &Block{
12471252
BlockHash: h,
12481253
BlockNumber: currentBlock.Number,
12491254
BlockTimestamp: currentBlock.Timestamp,
12501255
FinalizedBlockNumber: finalized,
12511256
SafeBlockNumber: safe,
12521257
}
1253-
logs = append(logs, convertLogs(rpcLogs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID())...)
1254-
blocks = append(blocks, block)
1258+
logs = append(logs, convertLogs(rpcLogs, []Block{*block}, lp.lggr, lp.ec.ConfiguredChainID())...)
1259+
// Always save the block with logs, to know an impact of finality violation and for better observability.
1260+
if len(rpcLogs) > 0 {
1261+
blocks = append(blocks, *block)
1262+
}
12551263

12561264
if currentBlock.Number >= latest {
12571265
return blocks, logs, nil
@@ -1328,39 +1336,50 @@ func (lp *logPoller) latestSafeBlock(ctx context.Context, latestFinalizedBlockNu
13281336

13291337
// Find the first place where our chain and their chain have the same block,
13301338
// that block number is the LCA. Return the block after that, where we want to resume polling.
1331-
func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.Head, latestFinalizedBlockNumber int64) (*evmtypes.Head, error) {
1332-
// Current is where the mismatch starts.
1333-
// Check its parent to see if its the same as ours saved.
1334-
parent, err := lp.latencyMonitor.HeadByHash(ctx, current.ParentHash)
1335-
if err != nil {
1336-
return nil, err
1339+
func (lp *logPoller) findBlockAfterLCA(ctx context.Context, currentHeadNumber int64, dbLatestFinalizedBlockNumber int64) (*evmtypes.Head, error) {
1340+
if currentHeadNumber < dbLatestFinalizedBlockNumber {
1341+
lp.lggr.Criticalw("Unexpected state. Current head number is lower than latest finalized block number", "currentHeadNumber", currentHeadNumber, "dbLatestFinalizedBlockNumber", dbLatestFinalizedBlockNumber)
1342+
return nil, fmt.Errorf("current head number %d is lower than latest finalized block number %d: %w", currentHeadNumber, dbLatestFinalizedBlockNumber, commontypes.ErrFinalityViolated)
13371343
}
1338-
blockAfterLCA := current
1344+
13391345
// We expect reorgs up to the block after latestFinalizedBlock
1340-
// We loop via parent instead of current so current always holds the LCA+1.
13411346
// If the parent block number becomes < the first finalized block our reorg is too deep.
1342-
// This can happen only if finalityTag is not enabled and fixed finalityDepth is provided via config.
1343-
var ourParentBlockHash common.Hash
1344-
for parent.Number >= latestFinalizedBlockNumber {
1345-
outParentBlock, err := lp.orm.SelectBlockByNumber(ctx, parent.Number)
1347+
// This can happen only if finalityTag is not enabled and fixed finalityDepth is provided via config or chain violates finality guarantees.
1348+
for {
1349+
// Since we do not store all blocks in the db, it's possible that we do not have the parent block in our db.
1350+
// Find the nearest ancestor that we have in our db and check if it still belongs to canonical chain.
1351+
ourParent, err := lp.orm.SelectNewestBlock(ctx, currentHeadNumber-1)
13461352
if err != nil {
1347-
return nil, err
1348-
}
1349-
ourParentBlockHash = outParentBlock.BlockHash
1350-
if parent.Hash == ourParentBlockHash {
1351-
// If we do have the blockhash, return blockAfterLCA
1352-
return blockAfterLCA, nil
1353+
if errors.Is(err, sql.ErrNoRows) {
1354+
lp.lggr.Warnw("No ancestor block found in db, this means that the reorg is deeper than the number of blocks we have in the db.", "err", err, "currentHeadNumber", currentHeadNumber, "dbLatestFinalizedBlockNumber", dbLatestFinalizedBlockNumber)
1355+
// we should return currentHeadNumber as the block after LCA, to avoid drifting too far back.
1356+
return lp.headerByNumber(ctx, currentHeadNumber)
1357+
}
1358+
return nil, fmt.Errorf("failed to select ancestor for current block %d: %w", currentHeadNumber-1, err)
13531359
}
1354-
// Otherwise get a new parent and update blockAfterLCA.
1355-
blockAfterLCA = parent
1356-
parent, err = lp.latencyMonitor.HeadByHash(ctx, parent.ParentHash)
1360+
1361+
// Since we are looking for block after LCA, fetch child of ourParent.
1362+
// If new current points to ourParent, we found the LCA and can return block after it. Otherwise, keep looking for ancestors.
1363+
rpcChild, err := lp.headerByNumber(ctx, ourParent.BlockNumber+1)
13571364
if err != nil {
1358-
return nil, err
1365+
return nil, fmt.Errorf("failed to fetch block after ancestor %d: %w", ourParent.BlockNumber+1, err)
1366+
}
1367+
if ourParent.BlockHash == rpcChild.ParentHash {
1368+
return rpcChild, nil
1369+
}
1370+
1371+
if ourParent.BlockNumber <= dbLatestFinalizedBlockNumber {
1372+
lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag,
1373+
"current", rpcChild.Number,
1374+
"latestFinalized", dbLatestFinalizedBlockNumber,
1375+
"ourParentHash", ourParent.BlockHash,
1376+
"expectedParentHash", rpcChild.ParentHash,
1377+
"childHash", rpcChild.Hash)
1378+
return nil, fmt.Errorf("%w: finalized block with hash %s is not parent of canonical block at height %d, with parent hash %s", commontypes.ErrFinalityViolated, ourParent.BlockHash, rpcChild.Number, rpcChild.ParentHash)
13591379
}
1360-
}
13611380

1362-
lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber)
1363-
return nil, fmt.Errorf("%w: finalized block hash %s does not match RPC's %s at height %d", commontypes.ErrFinalityViolated, ourParentBlockHash, blockAfterLCA.Hash, blockAfterLCA.Number)
1381+
currentHeadNumber = ourParent.BlockNumber
1382+
}
13641383
}
13651384

13661385
// PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block.

pkg/logpoller/log_poller_internal_test.go

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -958,6 +958,202 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
958958
})
959959
}
960960

961+
func Test_FindBlockAfterLCA(t *testing.T) {
962+
testCases := []struct {
963+
Name string
964+
CurrentBlockNumber int64
965+
DBLatestFinalized int64
966+
DBBlocks []int64
967+
Setup func(t *testing.T, ec *clienttest.Client)
968+
ExpectedError error
969+
ExpectedHead *evmtypes.Head
970+
}{
971+
{
972+
Name: "current head lower than DB finalized",
973+
CurrentBlockNumber: 3,
974+
DBLatestFinalized: 5,
975+
DBBlocks: nil,
976+
Setup: nil,
977+
ExpectedError: commontypes.ErrFinalityViolated,
978+
ExpectedHead: nil,
979+
},
980+
{
981+
Name: "no reorg - chains match on first iteration",
982+
CurrentBlockNumber: 5,
983+
DBLatestFinalized: 3,
984+
DBBlocks: []int64{4},
985+
Setup: func(t *testing.T, ec *clienttest.Client) {
986+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
987+
return newHead(n.Int64()), nil
988+
})
989+
},
990+
ExpectedError: nil,
991+
ExpectedHead: newHead(5),
992+
},
993+
{
994+
Name: "reorg - LCA found after walking back",
995+
CurrentBlockNumber: 5,
996+
DBLatestFinalized: 1,
997+
DBBlocks: []int64{2, 3},
998+
Setup: func(t *testing.T, ec *clienttest.Client) {
999+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
1000+
num := n.Int64()
1001+
if num == 4 {
1002+
return &evmtypes.Head{Number: 4, Hash: common.BigToHash(big.NewInt(4)), ParentHash: common.HexToHash("0xdead")}, nil
1003+
}
1004+
return newHead(num), nil
1005+
}).Maybe()
1006+
},
1007+
ExpectedError: nil,
1008+
ExpectedHead: newHead(3),
1009+
},
1010+
{
1011+
Name: "reorg too deep",
1012+
CurrentBlockNumber: 5,
1013+
DBLatestFinalized: 2,
1014+
DBBlocks: []int64{1},
1015+
Setup: func(t *testing.T, ec *clienttest.Client) {
1016+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
1017+
num := n.Int64()
1018+
return &evmtypes.Head{Number: num, Hash: common.BigToHash(big.NewInt(num)), ParentHash: common.HexToHash("0xbeef")}, nil
1019+
})
1020+
},
1021+
ExpectedError: commontypes.ErrFinalityViolated,
1022+
ExpectedHead: nil,
1023+
},
1024+
{
1025+
Name: "RPC HeadByNumber returns error",
1026+
CurrentBlockNumber: 5,
1027+
DBLatestFinalized: 3,
1028+
DBBlocks: []int64{4},
1029+
Setup: func(t *testing.T, ec *clienttest.Client) {
1030+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
1031+
if n.Int64() == 5 {
1032+
return nil, errors.New("rpc error")
1033+
}
1034+
return newHead(n.Int64()), nil
1035+
})
1036+
},
1037+
ExpectedError: errors.New("rpc error"),
1038+
ExpectedHead: nil,
1039+
},
1040+
{
1041+
Name: "All blocks in DB are on a different chain",
1042+
CurrentBlockNumber: 100,
1043+
DBLatestFinalized: 10,
1044+
DBBlocks: []int64{90, 80, 55, 20},
1045+
Setup: func(t *testing.T, ec *clienttest.Client) {
1046+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
1047+
head := newHead(n.Int64())
1048+
if n.Int64() < 20 {
1049+
return head, nil
1050+
}
1051+
head.ParentHash = common.HexToHash("0xdead")
1052+
head.Hash = common.HexToHash("0xdead")
1053+
return head, nil
1054+
})
1055+
},
1056+
ExpectedError: nil,
1057+
ExpectedHead: &evmtypes.Head{
1058+
Number: 20,
1059+
Hash: common.HexToHash("0xdead"),
1060+
ParentHash: common.HexToHash("0xdead"),
1061+
},
1062+
},
1063+
{
1064+
Name: "Sparse DB blocks - LCA found successfully",
1065+
CurrentBlockNumber: 100,
1066+
DBLatestFinalized: 10,
1067+
DBBlocks: []int64{90, 80, 55, 20},
1068+
Setup: func(t *testing.T, ec *clienttest.Client) {
1069+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
1070+
head := newHead(n.Int64())
1071+
if n.Int64() > 21 {
1072+
head.ParentHash = common.HexToHash("0xdead")
1073+
return head, nil
1074+
}
1075+
return head, nil
1076+
})
1077+
},
1078+
ExpectedError: nil,
1079+
ExpectedHead: newHead(21),
1080+
},
1081+
{
1082+
Name: "Child of latest finalized is not canonical - finality violation",
1083+
CurrentBlockNumber: 100,
1084+
DBLatestFinalized: 80,
1085+
DBBlocks: []int64{90, 80, 55, 20},
1086+
Setup: func(t *testing.T, ec *clienttest.Client) {
1087+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
1088+
head := newHead(n.Int64())
1089+
if n.Int64() > 80 {
1090+
head.ParentHash = common.HexToHash("0xdead")
1091+
return head, nil
1092+
}
1093+
return head, nil
1094+
})
1095+
},
1096+
ExpectedError: commontypes.ErrFinalityViolated,
1097+
ExpectedHead: nil,
1098+
},
1099+
{
1100+
// Such case is possible, since DBLatestFinalized is defined by FinalizedBlockNumber of the latest block.
1101+
Name: "Latest finalized DB block is in canonical but much older than DBLatestFinalized",
1102+
CurrentBlockNumber: 100,
1103+
DBLatestFinalized: 80,
1104+
DBBlocks: []int64{90, 70, 55, 20},
1105+
Setup: func(t *testing.T, ec *clienttest.Client) {
1106+
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
1107+
head := newHead(n.Int64())
1108+
if n.Int64() > 80 {
1109+
head.ParentHash = common.HexToHash("0xdead")
1110+
return head, nil
1111+
}
1112+
return head, nil
1113+
})
1114+
},
1115+
ExpectedError: nil,
1116+
ExpectedHead: newHead(71),
1117+
},
1118+
}
1119+
for _, tc := range testCases {
1120+
tc := tc
1121+
t.Run(tc.Name, func(t *testing.T) {
1122+
t.Parallel()
1123+
db := testutils.NewSqlxDB(t)
1124+
lggr, _ := logger.TestObserved(t, zapcore.DebugLevel)
1125+
orm := NewORM(testutils.NewRandomEVMChainID(), db, lggr)
1126+
headTracker := headstest.NewTracker[*evmtypes.Head, common.Hash](t)
1127+
ec := clienttest.NewClient(t)
1128+
ctx := testutils.Context(t)
1129+
for _, blockNum := range tc.DBBlocks {
1130+
hash := common.BigToHash(big.NewInt(blockNum))
1131+
require.NoError(t, orm.InsertBlock(ctx, hash, blockNum, time.Now(), blockNum, blockNum))
1132+
}
1133+
if tc.Setup != nil {
1134+
tc.Setup(t, ec)
1135+
}
1136+
lpOpts := Opts{
1137+
PollPeriod: time.Second,
1138+
FinalityDepth: 3,
1139+
BackfillBatchSize: 3,
1140+
RPCBatchSize: 3,
1141+
KeepFinalizedBlocksDepth: 20,
1142+
BackupPollerBlockDelay: 0,
1143+
}
1144+
lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
1145+
blockAfterLCA, err := lp.findBlockAfterLCA(ctx, tc.CurrentBlockNumber, tc.DBLatestFinalized)
1146+
if tc.ExpectedError != nil {
1147+
require.ErrorContains(t, err, tc.ExpectedError.Error())
1148+
require.Nil(t, blockAfterLCA)
1149+
} else {
1150+
require.NoError(t, err)
1151+
require.Equal(t, tc.ExpectedHead, blockAfterLCA)
1152+
}
1153+
})
1154+
}
1155+
}
1156+
9611157
func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) {
9621158
lggr := logger.Test(b)
9631159
lpOpts := Opts{

pkg/logpoller/observability.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,12 @@ func (o *ObservedORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumb
138138
})
139139
}
140140

141+
func (o *ObservedORM) SelectNewestBlock(ctx context.Context, maxAllowedBlockNumber int64) (*Block, error) {
142+
return withObservedQuery(ctx, o, "SelectNewestBlock", func() (*Block, error) {
143+
return o.ORM.SelectNewestBlock(ctx, maxAllowedBlockNumber)
144+
})
145+
}
146+
141147
func (o *ObservedORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error) {
142148
return withObservedQuery(ctx, o, "SelectLatestLogByEventSigWithConfs", func() (*Log, error) {
143149
return o.ORM.SelectLatestLogByEventSigWithConfs(ctx, eventSig, address, confs)

pkg/logpoller/orm.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type ORM interface {
4343
SelectBlockByHash(ctx context.Context, hash common.Hash) (*Block, error)
4444
SelectLatestBlock(ctx context.Context) (*Block, error)
4545
SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*Block, error)
46+
SelectNewestBlock(ctx context.Context, maxAllowedBlockNumber int64) (*Block, error)
4647
SelectLatestFinalizedBlock(ctx context.Context) (*Block, error)
4748

4849
SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error)
@@ -295,6 +296,17 @@ func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int
295296
return &b, nil
296297
}
297298

299+
func (o *DSORM) SelectNewestBlock(ctx context.Context, maxAllowedBlockNumber int64) (*Block, error) {
300+
var b Block
301+
if err := o.ds.GetContext(ctx, &b,
302+
blocksQuery(`WHERE evm_chain_id = $1 AND block_number <= $2 ORDER BY block_number DESC LIMIT 1`),
303+
sqlutil.New(o.chainID), maxAllowedBlockNumber,
304+
); err != nil {
305+
return nil, err
306+
}
307+
return &b, nil
308+
}
309+
298310
func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error) {
299311
args, err := newQueryArgsForEvent(o.chainID, address, eventSig).
300312
withConfs(confs).

0 commit comments

Comments
 (0)