Skip to content

Commit ec81de1

Browse files
authored
Fix Replay Reorg Detection Bypassed When Parent Block Is Missing (#422)
* Test: reorg during replay * Fix reorg handling during replay * Fix nits * ORM methods to batch insert * switch to batch logs/blocks inserts * nit fixes * switch ubig to sqlutil big * fix orm_test * Refactor tests in preparation for sparse blocks * Store only blocks with logs and checkpoints * Add feature flag to disable/enable sparse blocks storage * Fix Replay Reorg Detection Bypassed When Parent Block Is Missing * Fix flaky test * Improve backfill tests * Fix linter issues
1 parent 91a892a commit ec81de1

5 files changed

Lines changed: 2675 additions & 52 deletions

File tree

pkg/.mockery.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@ packages:
9292
github.com/smartcontractkit/chainlink-evm/pkg/logpoller:
9393
interfaces:
9494
LogPoller:
95+
ORM:
96+
config:
97+
dir: "{{ .InterfaceDir }}"
98+
outpkg: logpoller
99+
filename: "mock_orm_test.go"
100+
mockname: "MockORM"
101+
inpackage: true
95102
github.com/smartcontractkit/chainlink-evm/pkg/round:
96103
interfaces:
97104
RequestRoundDB:

pkg/logpoller/helper_test.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -176,16 +176,6 @@ func (th *TestHarness) assertDontHave(t *testing.T, start, end int) {
176176
}
177177
}
178178

179-
func (th *TestHarness) assertHaveCanonical(t *testing.T, start, end int) {
180-
for i := start; i < end; i++ {
181-
blk, err := th.ORM.SelectBlockByNumber(testutils.Context(t), int64(i))
182-
require.NoError(t, err, "block %v", i)
183-
chainBlk, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(int64(i)))
184-
require.NoError(t, err)
185-
assert.Equal(t, chainBlk.Hash().String(), blk.BlockHash.String(), "block %v", i)
186-
}
187-
}
188-
189179
// Simulates an RPC failover event to an alternate rpc server. This can also be used to
190180
// simulate switching back to the primary rpc after it recovers.
191181
func (th *TestHarness) SetActiveClient(backend evmtypes.Backend, chainType chaintype.ChainType) {

pkg/logpoller/log_poller.go

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,37 +1005,55 @@ func (lp *logPoller) headerByNumber(ctx context.Context, blockNumber int64) (*ev
10051005
// 2. Delete all logs and blocks after the LCA
10061006
// 3. Return the LCA+1, i.e. our new current (unprocessed) block.
10071007
func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head, isReplay bool) (head *evmtypes.Head, err error) {
1008-
var err1 error
10091008
if currentBlock == nil {
1010-
currentBlock, err1 = lp.headerByNumber(ctx, currentBlockNumber)
1011-
if err1 != nil {
1012-
return nil, fmt.Errorf("unable to get current block header for block number %d: %w", currentBlockNumber, err1)
1009+
currentBlock, err = lp.headerByNumber(ctx, currentBlockNumber)
1010+
if err != nil {
1011+
return nil, fmt.Errorf("unable to get current block header for block number %d: %w", currentBlockNumber, err)
10131012
}
10141013
}
10151014
// Does this currentBlock point to the same parent that we have saved?
10161015
// If not, there was a reorg, so we need to rewind.
1017-
expectedParent, err1 := lp.orm.SelectBlockByNumber(ctx, currentBlockNumber-1)
1018-
if err1 != nil && !pkgerrors.Is(err1, sql.ErrNoRows) {
1016+
expectedParent, err := lp.orm.SelectBlockByNumber(ctx, currentBlockNumber-1)
1017+
if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) {
10191018
// If err is not a 'no rows' error, assume transient db issue and retry
1020-
lp.lggr.Warnw("Unable to read latestBlockNumber currentBlock saved", "err", err1, "currentBlockNumber", currentBlockNumber)
1019+
lp.lggr.Warnw("Unable to read latestBlockNumber currentBlock saved", "err", err, "currentBlockNumber", currentBlockNumber)
10211020
return nil, pkgerrors.New("Unable to read latestBlockNumber currentBlock saved")
10221021
}
10231022
// We will not have the previous currentBlock on initial poll.
1024-
havePreviousBlock := err1 == nil
1025-
if !havePreviousBlock {
1026-
lp.lggr.Infow("Do not have previous block, first poll ever on new chain", "currentBlockNumber", currentBlockNumber)
1027-
return currentBlock, nil
1028-
}
1029-
// Check for reorg.
1030-
if currentBlock.ParentHash != expectedParent.BlockHash {
1031-
return lp.handleReorg(ctx, currentBlock)
1032-
}
1023+
havePreviousBlock := err == nil
1024+
if havePreviousBlock {
1025+
// Check for reorg.
1026+
if currentBlock.ParentHash != expectedParent.BlockHash {
1027+
return lp.handleReorg(ctx, currentBlock)
1028+
}
10331029

1034-
if !isReplay {
1035-
// During normal polling DB does not have any blocks after currentBlockNumber, so no reorg is possible. We can skip extra checks and just return currentBlock.
1036-
return currentBlock, nil
1030+
if !isReplay {
1031+
// During normal polling DB does not have any blocks after currentBlockNumber, so no reorg is possible. We can skip extra checks and just return currentBlock.
1032+
return currentBlock, nil
1033+
}
1034+
} else {
1035+
// previous block is not present, it could only in case of initial poll or replay.
1036+
// If that's not a replay, let's double-check that it's an initial poll by verifying that the DB is empty.
1037+
if !isReplay {
1038+
_, err = lp.orm.SelectLatestBlock(ctx)
1039+
if err == nil {
1040+
err = fmt.Errorf("unexpected state: no previous block found for block number %d, but db is not empty", currentBlockNumber)
1041+
lp.lggr.Criticalw("Invariant violation: expected to always have previous block except replay and first poll for a new chain", "currentBlockNumber", currentBlockNumber, "err", err)
1042+
return nil, err
1043+
}
1044+
1045+
if !errors.Is(err, sql.ErrNoRows) {
1046+
// Real DB error
1047+
return nil, fmt.Errorf("failed to check for first poll ever on new chain: %w", err)
1048+
}
1049+
1050+
// This is expected, we just don't have any blocks in the DB yet.
1051+
lp.lggr.Infow("Do not have previous block, first poll ever on new chain", "currentBlockNumber", currentBlockNumber)
1052+
return currentBlock, nil
1053+
}
10371054
}
10381055

1056+
// In case of replay currentBlock may be in the middle of DB range, lets do additional checks to handle possible reorgs.
10391057
// Ensure that if DB contains current block it matches the current block from RPC.
10401058
currentBlockDB, err := lp.orm.SelectBlockByNumber(ctx, currentBlockNumber)
10411059
if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) {
@@ -1047,12 +1065,12 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
10471065
}
10481066

10491067
// No reorg for current block, but during replay it's possible that current block is older than the latest block, let's check it too to avoid false positives on finality violation.
1050-
latestBlockDB, err1 := lp.orm.SelectLatestBlock(ctx)
1051-
if err1 != nil {
1052-
if pkgerrors.Is(err1, sql.ErrNoRows) {
1053-
lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when checking for reorg during replay, but got no rows", "currentBlockNumber", currentBlockNumber, "err", err1)
1068+
latestBlockDB, err := lp.orm.SelectLatestBlock(ctx)
1069+
if err != nil {
1070+
if pkgerrors.Is(err, sql.ErrNoRows) {
1071+
lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when checking for reorg during replay, but got no rows", "currentBlockNumber", currentBlockNumber, "err", err)
10541072
}
1055-
return nil, pkgerrors.Wrap(err1, "unable to get latest block")
1073+
return nil, pkgerrors.Wrap(err, "unable to get latest block")
10561074
}
10571075

10581076
if currentBlock.BlockNumber() >= latestBlockDB.BlockNumber {

0 commit comments

Comments
 (0)