Skip to content

Commit 3adef88

Browse files
authored
Merge pull request #2352 from CortexFoundation/dev
core/filtermaps: fix log index initialization
2 parents 4f8bdb9 + 4a0ddfc commit 3adef88

3 files changed

Lines changed: 48 additions & 41 deletions

File tree

core/filtermaps/chain_view.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,6 @@ func (cv *ChainView) SharedRange(cv2 *ChainView) common.Range[uint64] {
135135
return common.NewRange(0, sharedLen)
136136
}
137137

138-
// limitedView returns a new chain view that is a truncated version of the parent view.
139-
func (cv *ChainView) limitedView(newHead uint64) *ChainView {
140-
if newHead >= cv.headNumber {
141-
return cv
142-
}
143-
return NewChainView(cv.chain, newHead, cv.BlockHash(newHead))
144-
}
145-
146138
// equalViews returns true if the two chain views are equivalent.
147139
func equalViews(cv1, cv2 *ChainView) bool {
148140
if cv1 == nil || cv2 == nil {

core/filtermaps/filtermaps.go

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ var (
5050
)
5151

5252
const (
53-
databaseVersion = 1 // reindexed if database version does not match
53+
databaseVersion = 2 // reindexed if database version does not match
5454
cachedLastBlocks = 1000 // last block of map pointers
5555
cachedLvPointers = 1000 // first log value pointer of block pointers
5656
cachedBaseRows = 100 // groups of base layer filter row data
@@ -244,6 +244,8 @@ func NewFilterMaps(db ctxcdb.KeyValueStore, initView *ChainView, historyCutoff,
244244
disabledCh: make(chan struct{}),
245245
exportFileName: config.ExportFileName,
246246
Params: params,
247+
targetView: initView,
248+
indexedView: initView,
247249
indexedRange: filterMapsRange{
248250
initialized: initialized,
249251
headIndexed: rs.HeadIndexed,
@@ -265,16 +267,8 @@ func NewFilterMaps(db ctxcdb.KeyValueStore, initView *ChainView, historyCutoff,
265267
baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows),
266268
renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots),
267269
}
270+
f.checkRevertRange() // revert maps that are inconsistent with the current chain view
268271

269-
// Set initial indexer target.
270-
f.targetView = initView
271-
if f.indexedRange.initialized {
272-
f.indexedView = f.initChainView(f.targetView)
273-
f.indexedRange.headIndexed = f.indexedRange.blocks.AfterLast() == f.indexedView.HeadNumber()+1
274-
if !f.indexedRange.headIndexed {
275-
f.indexedRange.headDelimiter = 0
276-
}
277-
}
278272
if f.indexedRange.hasIndexedBlocks() {
279273
log.Info("Initialized log indexer",
280274
"first block", f.indexedRange.blocks.First(), "last block", f.indexedRange.blocks.Last(),
@@ -303,29 +297,40 @@ func (f *FilterMaps) Stop() {
303297
f.closeWg.Wait()
304298
}
305299

306-
// initChainView returns a chain view consistent with both the current target
307-
// view and the current state of the log index as found in the database, based
308-
// on the last block of stored maps.
309-
// Note that the returned view might be shorter than the existing index if
310-
// the latest maps are not consistent with targetView.
311-
func (f *FilterMaps) initChainView(chainView *ChainView) *ChainView {
312-
mapIndex := f.indexedRange.maps.AfterLast()
313-
for {
314-
var ok bool
315-
mapIndex, ok = f.lastMapBoundaryBefore(mapIndex)
316-
if !ok {
317-
break
300+
// checkRevertRange checks whether the existing index is consistent with the
301+
// current indexed view and reverts inconsistent maps if necessary.
302+
func (f *FilterMaps) checkRevertRange() {
303+
if f.indexedRange.maps.Count() == 0 {
304+
return
305+
}
306+
lastMap := f.indexedRange.maps.Last()
307+
lastBlockNumber, lastBlockId, err := f.getLastBlockOfMap(lastMap)
308+
if err != nil {
309+
log.Error("Error initializing log index database; resetting log index", "error", err)
310+
f.reset()
311+
return
312+
}
313+
for lastBlockNumber > f.indexedView.HeadNumber() || f.indexedView.BlockId(lastBlockNumber) != lastBlockId {
314+
// revert last map
315+
if f.indexedRange.maps.Count() == 1 {
316+
f.reset() // reset database if no rendered maps remained
317+
return
318318
}
319-
lastBlockNumber, lastBlockId, err := f.getLastBlockOfMap(mapIndex)
319+
lastMap--
320+
newRange := f.indexedRange
321+
newRange.maps.SetLast(lastMap)
322+
lastBlockNumber, lastBlockId, err = f.getLastBlockOfMap(lastMap)
320323
if err != nil {
321-
log.Error("Could not initialize indexed chain view", "error", err)
322-
break
323-
}
324-
if lastBlockNumber <= chainView.HeadNumber() && chainView.BlockId(lastBlockNumber) == lastBlockId {
325-
return chainView.limitedView(lastBlockNumber)
324+
log.Error("Error initializing log index database; resetting log index", "error", err)
325+
f.reset()
326+
return
326327
}
328+
newRange.blocks.SetAfterLast(lastBlockNumber) // lastBlockNumber is probably partially indexed
329+
newRange.headIndexed = false
330+
newRange.headDelimiter = 0
331+
// only shorten range and leave map data; next head render will overwrite it
332+
f.setRange(f.db, f.indexedView, newRange, false)
327333
}
328-
return chainView.limitedView(0)
329334
}
330335

331336
// reset un-initializes the FilterMaps structure and removes all related data from

core/filtermaps/map_renderer.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -468,15 +468,25 @@ func (r *mapRenderer) writeFinishedMaps(pauseCb func() bool) error {
468468
r.f.filterMapCache.Remove(mapIndex)
469469
}
470470
}
471+
var blockNumber uint64
472+
if r.finished.First() > 0 {
473+
// in order to always ensure continuous block pointers, initialize
474+
// blockNumber based on the last block of the previous map, then verify
475+
// against the first block associated with each rendered map
476+
lastBlock, _, err := r.f.getLastBlockOfMap(r.finished.First() - 1)
477+
if err != nil {
478+
return fmt.Errorf("failed to get last block of previous map %d: %v", r.finished.First()-1, err)
479+
}
480+
blockNumber = lastBlock + 1
481+
}
471482
// add or update block pointers
472-
blockNumber := r.finishedMaps[r.finished.First()].firstBlock()
473483
for mapIndex := range r.finished.Iter() {
474484
renderedMap := r.finishedMaps[mapIndex]
475-
r.f.storeLastBlockOfMap(batch, mapIndex, renderedMap.lastBlock, renderedMap.lastBlockId)
476-
checkWriteCnt()
477485
if blockNumber != renderedMap.firstBlock() {
478-
panic("non-continuous block numbers")
486+
return fmt.Errorf("non-continuous block numbers in rendered map %d (next expected: %d first rendered: %d)", mapIndex, blockNumber, renderedMap.firstBlock())
479487
}
488+
r.f.storeLastBlockOfMap(batch, mapIndex, renderedMap.lastBlock, renderedMap.lastBlockId)
489+
checkWriteCnt()
480490
for _, lvPtr := range renderedMap.blockLvPtrs {
481491
r.f.storeBlockLvPointer(batch, blockNumber, lvPtr)
482492
checkWriteCnt()

0 commit comments

Comments
 (0)