Skip to content

Commit abead20

Browse files
committed
Fix reorg handling during replay
1 parent 250d065 commit abead20

4 files changed

Lines changed: 126 additions & 57 deletions

File tree

pkg/logpoller/helper_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (th *TestHarness) AdjustTime(t *testing.T, d time.Duration) {
164164
}
165165

166166
func (th *TestHarness) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) int64 {
167-
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber)
167+
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber, false)
168168
latest, _ := th.LogPoller.LatestBlock(ctx)
169169
return latest.BlockNumber + 1
170170
}

pkg/logpoller/log_poller.go

Lines changed: 112 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ type LogPoller interface {
7676

7777
type LogPollerTest interface {
7878
LogPoller
79-
PollAndSaveLogs(ctx context.Context, currentBlockNumber int64)
79+
PollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool)
8080
BackupPollAndSaveLogs(ctx context.Context) error
8181
Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery
8282
GetReplayFromBlock(ctx context.Context, requested int64) (int64, error)
@@ -662,7 +662,7 @@ func (lp *logPoller) run() {
662662
} else {
663663
start = lastProcessed.BlockNumber + 1
664664
}
665-
lp.PollAndSaveLogs(ctx, start)
665+
lp.PollAndSaveLogs(ctx, start, false)
666666
case <-backupLogPollTicker.C:
667667
if lp.backupPollerBlockDelay == 0 {
668668
continue // backup poller is disabled
@@ -772,7 +772,7 @@ func (lp *logPoller) handleReplayRequest(ctx context.Context, fromBlockReq int64
772772
if err == nil {
773773
// Serially process replay requests.
774774
lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq)
775-
lp.PollAndSaveLogs(ctx, fromBlock)
775+
lp.PollAndSaveLogs(ctx, fromBlock, true)
776776
lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq)
777777
}
778778
} else {
@@ -954,6 +954,26 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
954954
return nil
955955
}
956956

957+
func (lp *logPoller) headerByNumber(ctx context.Context, blockNumber int64) (*evmtypes.Head, error) {
958+
// If we don't have the current block already, lets get it.
959+
header, err := lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(blockNumber))
960+
if err != nil {
961+
lp.lggr.Warnw("Unable to get currentBlock", "err", err, "blockNumber", blockNumber)
962+
return nil, fmt.Errorf("unable to get current block header for block number %d: %w", blockNumber, err)
963+
}
964+
// Additional sanity checks, don't necessarily trust the RPC.
965+
if header == nil {
966+
lp.lggr.Errorw("Unexpected nil block from RPC", "blockNumber", blockNumber)
967+
return nil, fmt.Errorf("got nil block for %d", blockNumber)
968+
}
969+
if header.Number != blockNumber {
970+
lp.lggr.Warnw("Unable to get currentBlock, rpc returned incorrect block", "blockNumber", blockNumber, "got", header.Number)
971+
return nil, fmt.Errorf("block mismatch have %d want %d", header.Number, blockNumber)
972+
}
973+
974+
return header, nil
975+
}
976+
957977
// getCurrentBlockMaybeHandleReorg accepts a block number
958978
// and will return that block if its parent points to our last saved block.
959979
// One can optionally pass the block header if it has already been queried to avoid an extra RPC call.
@@ -962,23 +982,12 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
962982
// 1. Find the LCA by following parent hashes.
963983
// 2. Delete all logs and blocks after the LCA
964984
// 3. Return the LCA+1, i.e. our new current (unprocessed) block.
965-
func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (head *evmtypes.Head, err error) {
985+
func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head, isReplay bool) (head *evmtypes.Head, err error) {
966986
var err1 error
967987
if currentBlock == nil {
968-
// If we don't have the current block already, lets get it.
969-
currentBlock, err1 = lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(currentBlockNumber))
988+
currentBlock, err1 = lp.headerByNumber(ctx, currentBlockNumber)
970989
if err1 != nil {
971-
lp.lggr.Warnw("Unable to get currentBlock", "err", err1, "currentBlockNumber", currentBlockNumber)
972-
return nil, err1
973-
}
974-
// Additional sanity checks, don't necessarily trust the RPC.
975-
if currentBlock == nil {
976-
lp.lggr.Errorw("Unexpected nil block from RPC", "currentBlockNumber", currentBlockNumber)
977-
return nil, pkgerrors.Errorf("Got nil block for %d", currentBlockNumber)
978-
}
979-
if currentBlock.Number != currentBlockNumber {
980-
lp.lggr.Warnw("Unable to get currentBlock, rpc returned incorrect block", "currentBlockNumber", currentBlockNumber, "got", currentBlock.Number)
981-
return nil, pkgerrors.Errorf("Block mismatch have %d want %d", currentBlock.Number, currentBlockNumber)
990+
return nil, fmt.Errorf("unable to get current block header for block number %d: %w", currentBlockNumber, err1)
982991
}
983992
}
984993
// Does this currentBlock point to the same parent that we have saved?
@@ -997,39 +1006,99 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
9971006
}
9981007
// Check for reorg.
9991008
if currentBlock.ParentHash != expectedParent.BlockHash {
1000-
// There can be another reorg while we're finding the LCA.
1001-
// That is ok, since we'll detect it on the next iteration.
1002-
// Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1.
1003-
blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, expectedParent.FinalizedBlockNumber)
1004-
if err2 != nil {
1005-
return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2)
1009+
return lp.handleReorg(ctx, currentBlock)
1010+
}
1011+
1012+
if !isReplay {
1013+
// During normal polling DB does not have any blocks after currentBlockNumber, so no reorg is possible. We can skip extra checks and just return currentBlock.
1014+
return currentBlock, nil
1015+
}
1016+
1017+
// Ensure that if DB contains current block it matches the current block from RPC.
1018+
currentBlockDB, err := lp.orm.SelectBlockByNumber(ctx, currentBlockNumber)
1019+
if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) {
1020+
return nil, fmt.Errorf("failed to get current block from DB %d: %w", currentBlockNumber, err)
1021+
}
1022+
1023+
if currentBlockDB != nil && currentBlock.Hash != currentBlockDB.BlockHash {
1024+
return lp.handleReorg(ctx, currentBlock)
1025+
}
1026+
1027+
// No reorg for current block, but during replay it's possible that current block is older than the latest block, let's check it too to avoid false positives on finality violation.
1028+
latestBlockDB, err1 := lp.orm.SelectLatestBlock(ctx)
1029+
if err1 != nil {
1030+
if pkgerrors.Is(err1, sql.ErrNoRows) {
1031+
lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when checking for reorg during replay, but got no rows", "currentBlockNumber", currentBlockNumber, "err", err1)
1032+
}
1033+
return nil, pkgerrors.Wrap(err1, "unable to get latest block")
1034+
}
1035+
1036+
if currentBlock.BlockNumber() >= latestBlockDB.BlockNumber {
1037+
// currentBlock is newest, nothing more to check
1038+
return currentBlock, nil
1039+
}
1040+
1041+
latestBlockRPC, err := lp.headerByNumber(ctx, latestBlockDB.BlockNumber)
1042+
if err != nil {
1043+
return nil, fmt.Errorf("unable to get latest block header for block number %d: %w", latestBlockDB.BlockNumber, err)
1044+
}
1045+
1046+
if latestBlockRPC.Hash != latestBlockDB.BlockHash {
1047+
// Reorg detected, handle it
1048+
lca, err := lp.handleReorg(ctx, latestBlockRPC)
1049+
if err != nil {
1050+
return nil, fmt.Errorf("failed to handle reorg: %w", err)
10061051
}
10071052

1008-
lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlockNumber)
1009-
// We truncate all the blocks and logs after the LCA.
1010-
// We could preserve the logs for forensics, since its possible
1011-
// that applications see them and take action upon it, however that
1012-
// results in significantly slower reads since we must then compute
1013-
// the canonical set per read. Typically, if an application took action on a log
1014-
// it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads.
1015-
// Its also nicely analogous to reading from the chain itself.
1016-
err2 = lp.orm.DeleteLogsAndBlocksAfter(ctx, blockAfterLCA.Number)
1017-
if err2 != nil {
1018-
// If we error on db commit, we can't know if the tx went through or not.
1019-
// We return an error here which will cause us to restart polling from lastBlockSaved + 1
1020-
return nil, err2
1053+
if lca.Number < currentBlock.BlockNumber() {
1054+
// LCA is older than current block, we need to get the new current block after reorg
1055+
return lca, nil
10211056
}
1022-
return blockAfterLCA, nil
10231057
}
1024-
// No reorg, return current block.
1058+
10251059
return currentBlock, nil
10261060
}
10271061

1062+
func (lp *logPoller) handleReorg(ctx context.Context, currentBlock *evmtypes.Head) (*evmtypes.Head, error) {
1063+
// during replay currentBlock may be older than the latest block, thus it's possible to miss finality violation,
1064+
// if we use its view on latest finalized block. To be safe, we get the latest block from the db.
1065+
latestBlock, err := lp.orm.SelectLatestBlock(ctx)
1066+
if err != nil {
1067+
if pkgerrors.Is(err, sql.ErrNoRows) {
1068+
lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when handling reorg, but got no rows", "currentBlockNumber", currentBlock.Number, "err", err)
1069+
}
1070+
return nil, pkgerrors.Wrap(err, "failed to get latest finalized block from db")
1071+
}
1072+
// There can be another reorg while we're finding the LCA.
1073+
// That is ok, since we'll detect it on the next iteration.
1074+
// Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1.
1075+
blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, latestBlock.FinalizedBlockNumber)
1076+
if err2 != nil {
1077+
return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2)
1078+
}
1079+
1080+
lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlock.Number)
1081+
// We truncate all the blocks and logs after the LCA.
1082+
// We could preserve the logs for forensics, since its possible
1083+
// that applications see them and take action upon it, however that
1084+
// results in significantly slower reads since we must then compute
1085+
// the canonical set per read. Typically, if an application took action on a log
1086+
// it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads.
1087+
// Its also nicely analogous to reading from the chain itself.
1088+
err2 = lp.orm.DeleteLogsAndBlocksAfter(ctx, blockAfterLCA.Number)
1089+
if err2 != nil {
1090+
// If we error on db commit, we can't know if the tx went through or not.
1091+
// We return an error here which will cause us to restart polling from lastBlockSaved + 1
1092+
return nil, err2
1093+
}
1094+
return blockAfterLCA, nil
1095+
}
1096+
10281097
// PollAndSaveLogs On startup/crash current is the first block after the last processed block.
10291098
// currentBlockNumber is the block from where new logs are to be polled & saved. Under normal
10301099
// conditions this would be equal to lastProcessed.BlockNumber + 1.
1031-
func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) {
1032-
err := lp.pollAndSaveLogs(ctx, currentBlockNumber)
1100+
func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) {
1101+
err := lp.pollAndSaveLogs(ctx, currentBlockNumber, isReplay)
10331102
if errors.Is(err, commontypes.ErrFinalityViolated) {
10341103
lp.lggr.Criticalw("Failed to poll and save logs due to finality violation, retrying later", "err", err)
10351104
lp.finalityViolated.Store(true)
@@ -1048,7 +1117,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
10481117
}
10491118
}
10501119

1051-
func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64) (err error) {
1120+
func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) (err error) {
10521121
lp.lggr.Debugw("Polling for logs", "currentBlockNumber", currentBlockNumber)
10531122
// Intentionally not using logPoller.finalityDepth directly but the latestFinalizedBlockNumber returned from lp.latestBlocks()
10541123
// latestBlocks knows how to pick a proper latestFinalizedBlockNumber based on the logPoller's configuration
@@ -1078,7 +1147,7 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int
10781147
}
10791148
// Possibly handle a reorg. For example if we crash, we'll be in the middle of processing unfinalized blocks.
10801149
// Returns (currentBlock || LCA+1 if reorg detected, error)
1081-
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, currentBlock)
1150+
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, currentBlock, isReplay)
10821151
if err != nil {
10831152
// If there's an error handling the reorg, we can't be sure what state the db was left in.
10841153
// Resume from the latest block saved and retry.
@@ -1104,7 +1173,7 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int
11041173

11051174
for {
11061175
if currentBlockNumber > currentBlock.Number {
1107-
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil)
1176+
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay)
11081177
if err != nil {
11091178
// If there's an error handling the reorg, we can't be sure what state the db was left in.
11101179
// Resume from the latest block saved.

pkg/logpoller/log_poller_internal_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ func assertBackupPollerStartup(t *testing.T, head *evmtypes.Head, finalizedHead
271271
assert.Equal(t, int64(0), lp.backupPollerNextBlock)
272272
assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len())
273273

274-
lp.PollAndSaveLogs(ctx, head.Number)
274+
lp.PollAndSaveLogs(ctx, head.Number, false)
275275

276276
lastProcessed, err := lp.orm.SelectLatestBlock(ctx)
277277
require.NoError(t, err)
@@ -380,7 +380,7 @@ func TestLogPoller_Replay(t *testing.T) {
380380
{
381381
ctx := testutils.Context(t)
382382
// process 1 log in block 3
383-
lp.PollAndSaveLogs(ctx, 4)
383+
lp.PollAndSaveLogs(ctx, 4, false)
384384
latest, err := lp.LatestBlock(ctx)
385385
require.NoError(t, err)
386386
require.Equal(t, int64(4), latest.BlockNumber)
@@ -785,7 +785,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
785785
// insert finalized block with different hash than in RPC
786786
require.NoError(t, orm.InsertBlock(t.Context(), common.HexToHash("0x123"), 2, time.Unix(10, 0), 2, 2))
787787
lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
788-
lp.PollAndSaveLogs(t.Context(), 4)
788+
lp.PollAndSaveLogs(t.Context(), 4, false)
789789
require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated)
790790
})
791791
t.Run("RPCs contradict each other and return different finalized blocks", func(t *testing.T) {
@@ -806,7 +806,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
806806
return evmtypes.Head{Number: num, Hash: utils.NewHash()}
807807
})
808808
lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
809-
lp.PollAndSaveLogs(t.Context(), 4)
809+
lp.PollAndSaveLogs(t.Context(), 4, false)
810810
require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated)
811811
})
812812
t.Run("Log's hash does not match block's", func(t *testing.T) {
@@ -824,7 +824,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
824824
ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.HexToHash("0x123")}}, nil).Once()
825825
mockBatchCallContext(t, ec)
826826
lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
827-
lp.PollAndSaveLogs(t.Context(), 4)
827+
lp.PollAndSaveLogs(t.Context(), 4, false)
828828
require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated)
829829
})
830830
t.Run("Happy path", func(t *testing.T) {
@@ -844,7 +844,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
844844
ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.BigToHash(big.NewInt(5)), Topics: []common.Hash{{}}}}, nil).Once()
845845
mockBatchCallContext(t, ec)
846846
lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
847-
lp.PollAndSaveLogs(t.Context(), 4)
847+
lp.PollAndSaveLogs(t.Context(), 4, false)
848848
require.NoError(t, lp.HealthReport()[lp.Name()])
849849
})
850850
}

pkg/logpoller/log_poller_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ func Test_BackupLogPoller(t *testing.T) {
377377
th.finalizeThroughBlock(t, 64)
378378

379379
// Run ordinary poller + backup poller at least once more
380-
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber)
380+
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber, false)
381381
require.NoError(t, th.LogPoller.BackupPollAndSaveLogs(ctx))
382382
currentBlock, err := th.LogPoller.LatestBlock(ctx)
383383
require.NoError(t, err)
@@ -718,7 +718,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
718718
backend.Commit()
719719
}
720720
currentBlockNumber := int64(1)
721-
lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber)
721+
lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber, false)
722722
currentBlock, err := lp.LatestBlock(testutils.Context(t))
723723
require.NoError(t, err)
724724
matchesGeth := func() bool {
@@ -770,7 +770,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
770770
require.NoError(t, err1)
771771
t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash())
772772
}
773-
lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber)
773+
lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber, false)
774774
currentBlock, err = lp.LatestBlock(testutils.Context(t))
775775
require.NoError(t, err)
776776
}
@@ -1370,7 +1370,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {
13701370
assert.Equal(t, 3, int(rpcBlocks2[1].FinalizedBlockNumber))
13711371

13721372
// after calling PollAndSaveLogs, block 3 (latest finalized block) is persisted in DB
1373-
th.LogPoller.PollAndSaveLogs(testutils.Context(t), 1)
1373+
th.LogPoller.PollAndSaveLogs(testutils.Context(t), 1, false)
13741374
block, err := th.ORM.SelectBlockByNumber(testutils.Context(t), 3)
13751375
require.NoError(t, err)
13761376
assert.Equal(t, 3, int(block.BlockNumber))
@@ -1621,7 +1621,7 @@ func TestTooManyLogResults(t *testing.T) {
16211621
Addresses: []common.Address{addr},
16221622
})
16231623
require.NoError(t, err)
1624-
lp.PollAndSaveLogs(ctx, 5)
1624+
lp.PollAndSaveLogs(ctx, 5, false)
16251625
block, err2 := o.SelectLatestBlock(ctx)
16261626
require.NoError(t, err2)
16271627
assert.Equal(t, int64(298), block.BlockNumber)
@@ -1653,7 +1653,7 @@ func TestTooManyLogResults(t *testing.T) {
16531653
return []types.Log{}, tooLargeErr // return "too many results" error if block range spans 4 or more blocks
16541654
})
16551655

1656-
lp.PollAndSaveLogs(ctx, 298)
1656+
lp.PollAndSaveLogs(ctx, 298, false)
16571657
block, err := o.SelectLatestBlock(ctx)
16581658
if err != nil {
16591659
require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself
@@ -1687,7 +1687,7 @@ func TestTooManyLogResults(t *testing.T) {
16871687
headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once()
16881688
headTracker.On("LatestSafeBlock", mock.Anything).Return(finalized, nil).Once()
16891689

1690-
lp.PollAndSaveLogs(ctx, 298)
1690+
lp.PollAndSaveLogs(ctx, 298, false)
16911691
block, err := o.SelectLatestBlock(ctx)
16921692
if err != nil {
16931693
require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself

0 commit comments

Comments
 (0)