Skip to content

Commit f6d1995

Browse files
authored
[M-01] Fix Non-atomic Reorg Cleanup in DeleteLogsAndBlocksAfter (#421)
1 parent 51b1fce commit f6d1995

2 files changed

Lines changed: 50 additions & 4 deletions

File tree

pkg/logpoller/orm.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error
396396
// If not applied, these queries can become very slow. After some critical number
397397
// of logs, Postgres will try to scan all the logs in the index by block_number.
398398
// Latency without upper bound filter can be orders of magnitude higher for large number of logs.
399-
_, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks
399+
_, err := orm.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks
400400
WHERE evm_chain_id = $1
401401
AND block_number >= $2
402402
AND block_number <= (SELECT MAX(block_number)
@@ -408,10 +408,10 @@ func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error
408408
return err
409409
}
410410

411-
_, err = o.ds.ExecContext(ctx, `DELETE FROM evm.logs
411+
_, err = orm.ds.ExecContext(ctx, `DELETE FROM evm.logs
412412
WHERE evm_chain_id = $1
413-
AND block_number >= $2
414-
AND block_number <= (SELECT MAX(block_number) FROM evm.logs WHERE evm_chain_id = $1)`,
413+
AND block_number >= $2
414+
AND block_number <= (SELECT MAX(block_number) FROM evm.logs WHERE evm_chain_id = $1)`,
415415
sqlutil.New(o.chainID), start)
416416
if err != nil {
417417
o.lggr.Warnw("Unable to clear reorged logs, retrying", "err", err)

pkg/logpoller/orm_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import (
99
"math"
1010
"math/big"
1111
"strconv"
12+
"sync/atomic"
1213
"testing"
1314
"time"
1415

1516
"github.com/ethereum/go-ethereum/common"
17+
"github.com/jmoiron/sqlx"
1618
pkgerrors "github.com/pkg/errors"
1719
"github.com/stretchr/testify/assert"
1820
"github.com/stretchr/testify/mock"
@@ -2463,3 +2465,47 @@ func TestSelectLatestFinalizedBlock(t *testing.T) {
24632465
require.Equal(t, common.HexToHash("0x1231"), result.BlockHash)
24642466
})
24652467
}
2468+
2469+
// execCountingSQLxDB wraps *sqlx.DB as a [sqlutil.DataSource] and counts how many
2470+
// ExecContext calls hit the pool object directly. Executions on a [*sqlx.Tx]
2471+
// returned from BeginTxx do not increment this counter.
2472+
//
2473+
// This guards a regression where [logpoller.DSORM.DeleteLogsAndBlocksAfter]
2474+
// used the outer ORM's DataSource inside a [logpoller.DSORM.Transact] callback
2475+
// instead of the transactional ORM's DataSource, so the two DELETEs were not
2476+
// part of the same database transaction.
2477+
type execCountingSQLxDB struct {
2478+
*sqlx.DB
2479+
2480+
poolExecContext atomic.Int32
2481+
}
2482+
2483+
func (d *execCountingSQLxDB) PoolExecContextCount() int32 {
2484+
return d.poolExecContext.Load()
2485+
}
2486+
2487+
func (d *execCountingSQLxDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
2488+
d.poolExecContext.Add(1)
2489+
return d.DB.ExecContext(ctx, query, args...)
2490+
}
2491+
2492+
var _ sqlutil.DataSource = (*execCountingSQLxDB)(nil)
2493+
2494+
func TestDSORM_DeleteLogsAndBlocksAfter_usesTransactionalDataSource(t *testing.T) {
2495+
t.Parallel()
2496+
testutils.SkipShortDB(t)
2497+
2498+
ctx := testutils.Context(t)
2499+
base := testutils.NewSqlxDB(t)
2500+
require.NotNil(t, base)
2501+
wrapped := &execCountingSQLxDB{DB: base}
2502+
2503+
lggr := logger.Test(t)
2504+
orm := logpoller.NewORM(testutils.NewRandomEVMChainID(), wrapped, lggr)
2505+
2506+
require.NoError(t, orm.DeleteLogsAndBlocksAfter(ctx, 1))
2507+
2508+
require.Zero(t, wrapped.PoolExecContextCount(),
2509+
"DeleteLogsAndBlocksAfter must run DELETEs on the transaction DataSource (orm.ds), not the outer pool; "+
2510+
"otherwise the two deletes are not atomic and this counter would be 2")
2511+
}

0 commit comments

Comments
 (0)