@@ -2152,3 +2152,113 @@ func TestWhere(t *testing.T) {
21522152 assert .Equal (t , []query.Expression {}, result )
21532153 })
21542154}
2155+
2156+ func TestLogPoller_Reorg_On_Replay (t * testing.T ) {
2157+ // TestCase:
2158+ // 1. LogPoller processes blocks to block 11
2159+ // 2. Reorg replaces block 11 with a new block (some additional blocks may be added on top of it)
2160+ // 3. Replay is initiated from block below 11.
2161+ // Expected behaviour:
2162+ // 1. LogPoller should replace reorged block 11 with a new data.
2163+ // 2. DB must not contain at any point logs from both old and new block 11.
2164+ // 3. Finality Violation must not occur, since chain did not violate finality depth.
2165+ t .Parallel ()
2166+ const reorgedBlockNumber = 11
2167+ testCases := []struct {
2168+ name string
2169+ numberOfBlocksAfterReorg int
2170+ }{
2171+ {
2172+ name : "Replay start right after reorg" ,
2173+ numberOfBlocksAfterReorg : 0 ,
2174+ },
2175+ {
2176+ name : "Replay start a few blocks after reorg" ,
2177+ numberOfBlocksAfterReorg : 1 ,
2178+ },
2179+ {
2180+ name : "Replay start once reorged block is finalized" ,
2181+ numberOfBlocksAfterReorg : 5 ,
2182+ },
2183+ }
2184+ for _ , tc := range testCases {
2185+ t .Run (tc .name , func (t * testing.T ) {
2186+ lpOpts := logpoller.Opts {
2187+ PollPeriod : 24 * time .Hour , // effectively disable automatic polling, so we can control when we poll in the test
2188+ UseFinalityTag : false ,
2189+ FinalityDepth : 3 ,
2190+ BackfillBatchSize : 3 ,
2191+ RPCBatchSize : 2 ,
2192+ KeepFinalizedBlocksDepth : 1000 ,
2193+ }
2194+ th := SetupTH (t , lpOpts )
2195+
2196+ // Set up a log poller listening for log emitter logs.
2197+ err := th .LogPoller .RegisterFilter (testutils .Context (t ), logpoller.Filter {
2198+ Name : "Test Emitter 1" ,
2199+ EventSigs : []common.Hash {EmitterABI .Events ["Log1" ].ID },
2200+ Addresses : []common.Address {th .EmitterAddress1 },
2201+ })
2202+ require .NoError (t , err )
2203+
2204+ // populate chain with data
2205+ for range reorgedBlockNumber - 1 {
2206+ _ , err = th .Emitter1 .EmitLog1 (th .Owner , []* big.Int {big .NewInt (int64 (1 ))})
2207+ require .NoError (t , err )
2208+ th .Backend .Commit ()
2209+ }
2210+
2211+ // start LogPoller and wait for it to complete first poll. Second poll won't happen until we call PollAndSaveLogs again, since poll period is very long.
2212+ require .NoError (t , th .LogPoller .Start (t .Context ()))
2213+ defer func () {
2214+ require .NoError (t , th .LogPoller .Close ())
2215+ }()
2216+ testutils .RequireEventually (t , func () bool {
2217+ latest , err := th .LogPoller .LatestBlock (t .Context ())
2218+ return err == nil && latest .BlockNumber == reorgedBlockNumber
2219+ })
2220+
2221+ reorgedBlock , err := th .Client .BlockByNumber (t .Context (), nil )
2222+ require .NoError (t , err )
2223+ require .Equal (t , int64 (reorgedBlockNumber ), reorgedBlock .Number ().Int64 ())
2224+
2225+ // Replace block 11 with a new block and burry it under 1 new block
2226+ require .NoError (t , th .Backend .Fork (reorgedBlock .ParentHash ()))
2227+ const newLogData = int64 (123 )
2228+ // Commit reorgedBlock and numberOfBlocksAfterReorg on top of it
2229+ for range tc .numberOfBlocksAfterReorg + 1 {
2230+ // emit log that is not tracked by LP to ensure that tracked log has a different index.
2231+ // So if reorg is not properly handled and both logs end up in the database
2232+ _ , err = th .Emitter2 .EmitLog1 (th .Owner , []* big.Int {big .NewInt (int64 (10 ))})
2233+ require .NoError (t , err )
2234+ _ , err = th .Emitter1 .EmitLog1 (th .Owner , []* big.Int {big .NewInt (newLogData )})
2235+ th .Backend .Commit ()
2236+ }
2237+
2238+ newReorgedBlock , err := th .Client .BlockByNumber (t .Context (), big .NewInt (reorgedBlockNumber ))
2239+ require .NoError (t , err )
2240+ require .NotEqual (t , reorgedBlock .Hash ().String (), newReorgedBlock .Hash ().String ())
2241+
2242+ latest , err := th .Client .BlockByNumber (t .Context (), nil )
2243+ require .NoError (t , err )
2244+ require .Equal (t , int64 (tc .numberOfBlocksAfterReorg + reorgedBlockNumber ), latest .Number ().Int64 ())
2245+
2246+ // Trigger replay, which should gracefully handle the reorg and end up on the new latest block
2247+ err = th .LogPoller .Replay (t .Context (), 5 )
2248+ require .NoError (t , err )
2249+ // LP should be on latest block now
2250+ lpLatest , err := th .LogPoller .LatestBlock (t .Context ())
2251+ require .NoError (t , err )
2252+ require .Equal (t , int64 (tc .numberOfBlocksAfterReorg + reorgedBlockNumber ), lpLatest .BlockNumber )
2253+ logs , err := th .ORM .SelectLogsByBlockRange (t .Context (), reorgedBlockNumber , reorgedBlockNumber )
2254+ require .NoError (t , err )
2255+ require .Len (t , logs , 1 )
2256+ require .Equal (t , newLogData , big .NewInt (0 ).SetBytes (logs [0 ].Data ).Int64 (), "Log data should match the log from the new block, indicating that the old block's log was properly removed during replay" )
2257+ // Ensure reorged block was replaced by a new one
2258+ dbBlock , err := th .ORM .SelectBlockByNumber (testutils .Context (t ), reorgedBlock .Number ().Int64 ())
2259+ require .NoError (t , err )
2260+ require .Equal (t , reorgedBlock .Number ().Int64 (), dbBlock .BlockNumber )
2261+ require .NotEqual (t , reorgedBlock .Hash (), dbBlock .BlockHash )
2262+ })
2263+ }
2264+ }
0 commit comments