Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions core/filtermaps/filtermaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var (
)

const (
databaseVersion = 1 // reindexed if database version does not match
cachedLastBlocks = 1000 // last block of map pointers
cachedLvPointers = 1000 // first log value pointer of block pointers
cachedBaseRows = 100 // groups of base layer filter row data
Expand Down Expand Up @@ -138,13 +139,25 @@ type FilterMaps struct {
// as transparent (uncached/unchanged).
type filterMap []FilterRow

// copy returns a copy of the given filter map. Note that the row slices are
// copied but their contents are not. This permits extending the rows further
// fastCopy returns a copy of the given filter map. Note that the row slices are
// copied but their contents are not. This permits appending to the rows further
// (which happens during map rendering) without affecting the validity of
// copies made for snapshots during rendering.
func (fm filterMap) copy() filterMap {
// Appending to the rows of both the original map and the fast copy, or two fast
// copies of the same map would result in data corruption, therefore a fast copy
// should always be used in a read only way.
func (fm filterMap) fastCopy() filterMap {
return slices.Clone(fm)
}

// fullCopy returns a copy of the given filter map, also making a copy of each
// individual filter row, ensuring that a modification to either one will never
// affect the other.
func (fm filterMap) fullCopy() filterMap {
c := make(filterMap, len(fm))
copy(c, fm)
for i, row := range fm {
c[i] = slices.Clone(row)
}
return c
}

Expand Down Expand Up @@ -207,8 +220,9 @@ type Config struct {
// NewFilterMaps creates a new FilterMaps and starts the indexer.
func NewFilterMaps(db ctxcdb.KeyValueStore, initView *ChainView, historyCutoff, finalBlock uint64, params Params, config Config) *FilterMaps {
rs, initialized, err := rawdb.ReadFilterMapsRange(db)
if err != nil {
log.Error("Error reading log index range", "error", err)
if err != nil || rs.Version != databaseVersion {
rs, initialized = rawdb.FilterMapsRange{}, false
log.Warn("Invalid log index database version; resetting log index")
}
params.deriveFields()
f := &FilterMaps{
Expand Down Expand Up @@ -437,6 +451,7 @@ func (f *FilterMaps) setRange(batch ctxcdb.KeyValueWriter, newView *ChainView, n
f.updateMatchersValidRange()
if newRange.initialized {
rs := rawdb.FilterMapsRange{
Version: databaseVersion,
HeadIndexed: newRange.headIndexed,
HeadDelimiter: newRange.headDelimiter,
BlocksFirst: newRange.blocks.First(),
Expand Down
45 changes: 18 additions & 27 deletions core/filtermaps/map_renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (f *FilterMaps) renderMapsBefore(renderBefore uint32) (*mapRenderer, error)
if err != nil {
return nil, err
}
if snapshot := f.lastCanonicalSnapshotBefore(renderBefore); snapshot != nil && snapshot.mapIndex >= nextMap {
if snapshot := f.lastCanonicalSnapshotOfMap(nextMap); snapshot != nil {
return f.renderMapsFromSnapshot(snapshot)
}
if nextMap >= renderBefore {
Expand All @@ -97,14 +97,14 @@ func (f *FilterMaps) renderMapsBefore(renderBefore uint32) (*mapRenderer, error)
// snapshot made at a block boundary.
func (f *FilterMaps) renderMapsFromSnapshot(cp *renderedMap) (*mapRenderer, error) {
f.testSnapshotUsed = true
iter, err := f.newLogIteratorFromBlockDelimiter(cp.lastBlock)
iter, err := f.newLogIteratorFromBlockDelimiter(cp.lastBlock, cp.headDelimiter)
if err != nil {
return nil, fmt.Errorf("failed to create log iterator from block delimiter %d: %v", cp.lastBlock, err)
}
return &mapRenderer{
f: f,
currentMap: &renderedMap{
filterMap: cp.filterMap.copy(),
filterMap: cp.filterMap.fullCopy(),
mapIndex: cp.mapIndex,
lastBlock: cp.lastBlock,
blockLvPtrs: cp.blockLvPtrs,
Expand Down Expand Up @@ -137,14 +137,14 @@ func (f *FilterMaps) renderMapsFromMapBoundary(firstMap, renderBefore uint32, st
}, nil
}

// lastCanonicalSnapshotBefore returns the latest cached snapshot that matches
// the current targetView.
func (f *FilterMaps) lastCanonicalSnapshotBefore(renderBefore uint32) *renderedMap {
// lastCanonicalSnapshotOfMap returns the latest cached snapshot of the given map
// that is also consistent with the current targetView.
func (f *FilterMaps) lastCanonicalSnapshotOfMap(mapIndex uint32) *renderedMap {
var best *renderedMap
for _, blockNumber := range f.renderSnapshots.Keys() {
if cp, _ := f.renderSnapshots.Get(blockNumber); cp != nil && blockNumber < f.indexedRange.blocks.AfterLast() &&
blockNumber <= f.targetView.headNumber && f.targetView.getBlockId(blockNumber) == cp.lastBlockId &&
cp.mapIndex < renderBefore && (best == nil || blockNumber > best.lastBlock) {
cp.mapIndex == mapIndex && (best == nil || blockNumber > best.lastBlock) {
best = cp
}
}
Expand All @@ -171,10 +171,9 @@ func (f *FilterMaps) lastCanonicalMapBoundaryBefore(renderBefore uint32) (nextMa
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to retrieve last block of reverse iterated map %d: %v", mapIndex, err)
}
if lastBlock >= f.indexedView.headNumber || lastBlock >= f.targetView.headNumber ||
lastBlockId != f.targetView.getBlockId(lastBlock) {
// map is not full or inconsistent with targetView; roll back
continue
if (f.indexedRange.headIndexed && mapIndex >= f.indexedRange.maps.Last()) ||
lastBlock >= f.targetView.headNumber || lastBlockId != f.targetView.getBlockId(lastBlock) {
continue // map is not full or inconsistent with targetView; roll back
}
lvPtr, err := f.getBlockLvPointer(lastBlock)
if err != nil {
Expand Down Expand Up @@ -257,11 +256,14 @@ func (f *FilterMaps) loadHeadSnapshot() error {

// makeSnapshot creates a snapshot of the current state of the rendered map.
func (r *mapRenderer) makeSnapshot() {
r.f.renderSnapshots.Add(r.iterator.blockNumber, &renderedMap{
filterMap: r.currentMap.filterMap.copy(),
if r.iterator.blockNumber != r.currentMap.lastBlock || r.iterator.chainView != r.f.targetView {
panic("iterator state inconsistent with current rendered map")
}
r.f.renderSnapshots.Add(r.currentMap.lastBlock, &renderedMap{
filterMap: r.currentMap.filterMap.fastCopy(),
mapIndex: r.currentMap.mapIndex,
lastBlock: r.iterator.blockNumber,
lastBlockId: r.f.targetView.getBlockId(r.currentMap.lastBlock),
lastBlock: r.currentMap.lastBlock,
lastBlockId: r.iterator.chainView.getBlockId(r.currentMap.lastBlock),
blockLvPtrs: r.currentMap.blockLvPtrs,
finished: true,
headDelimiter: r.iterator.lvIndex,
Expand Down Expand Up @@ -661,24 +663,13 @@ var errUnindexedRange = errors.New("unindexed range")
// newLogIteratorFromBlockDelimiter creates a logIterator starting at the
// given block's first log value entry (the block delimiter), according to the
// current targetView.
func (f *FilterMaps) newLogIteratorFromBlockDelimiter(blockNumber uint64) (*logIterator, error) {
func (f *FilterMaps) newLogIteratorFromBlockDelimiter(blockNumber, lvIndex uint64) (*logIterator, error) {
if blockNumber > f.targetView.headNumber {
return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", blockNumber, f.targetView.headNumber)
}
if !f.indexedRange.blocks.Includes(blockNumber) {
return nil, errUnindexedRange
}
var lvIndex uint64
if f.indexedRange.headIndexed && blockNumber+1 == f.indexedRange.blocks.AfterLast() {
lvIndex = f.indexedRange.headDelimiter
} else {
var err error
lvIndex, err = f.getBlockLvPointer(blockNumber + 1)
if err != nil {
return nil, fmt.Errorf("failed to retrieve log value pointer of block %d after delimiter: %v", blockNumber+1, err)
}
lvIndex--
}
finished := blockNumber == f.targetView.headNumber
l := &logIterator{
chainView: f.targetView,
Expand Down
3 changes: 3 additions & 0 deletions core/filtermaps/matcher_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func (fm *FilterMapsMatcherBackend) Close() {
// on write.
// GetFilterMapRow implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32, baseLayerOnly bool) (FilterRow, error) {
fm.f.indexLock.RLock()
defer fm.f.indexLock.RUnlock()

return fm.f.getFilterMapRow(mapIndex, rowIndex, baseLayerOnly)
}

Expand Down
1 change: 1 addition & 0 deletions core/rawdb/accessors_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ func DeleteBloombits(db ctxcdb.Database, bit uint, from uint64, to uint64) {
// FilterMapsRange is a storage representation of the block range covered by the
// filter maps structure and the corresponting log value index range.
type FilterMapsRange struct {
Version uint32
HeadIndexed bool
HeadDelimiter uint64
BlocksFirst, BlocksAfterLast uint64
Expand Down
1 change: 0 additions & 1 deletion ctxc/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ func New(stack *node.Node, config *Config) (*Cortex, error) {

//StateHistory: config.StateHistory,
//StateScheme: scheme,
//HistoryPruningCutoff: historyPruningCutoff,
ChainHistoryMode: config.HistoryMode,
}
)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0
github.com/CortexFoundation/inference v1.0.2-0.20230307032835-9197d586a4e8
github.com/CortexFoundation/statik v0.0.0-20210315012922-8bb8a7b5dc66
github.com/CortexFoundation/torrentfs v1.0.69-0.20250415134051-0b782fe8b66d
github.com/CortexFoundation/torrentfs v1.0.69-0.20250416125950-ceaf4dd388f2
github.com/VictoriaMetrics/fastcache v1.12.2
github.com/arsham/figurine v1.3.0
github.com/aws/aws-sdk-go-v2 v1.36.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ github.com/CortexFoundation/statik v0.0.0-20210315012922-8bb8a7b5dc66/go.mod h1:
github.com/CortexFoundation/torrentfs v1.0.13-0.20200623060705-ce027f43f2f8/go.mod h1:Ma+tGhPPvz4CEZHaqEJQMOEGOfHeQBiAoNd1zyc/w3Q=
github.com/CortexFoundation/torrentfs v1.0.14-0.20200703071639-3fcabcabf274/go.mod h1:qnb3YlIJmuetVBtC6Lsejr0Xru+1DNmDCdTqnwy7lhk=
github.com/CortexFoundation/torrentfs v1.0.20-0.20200810031954-d36d26f82fcc/go.mod h1:N5BsicP5ynjXIi/Npl/SRzlJ630n1PJV2sRj0Z0t2HA=
github.com/CortexFoundation/torrentfs v1.0.69-0.20250415134051-0b782fe8b66d h1:oZk98KwRFZJHPhztM2yGwHobSa0wls6gdJXHRbSTdPs=
github.com/CortexFoundation/torrentfs v1.0.69-0.20250415134051-0b782fe8b66d/go.mod h1:Lf7iaFiGPvExQiyPufsbQ8hs0XkKUunTLcp0r77/IaY=
github.com/CortexFoundation/torrentfs v1.0.69-0.20250416125950-ceaf4dd388f2 h1:qvdjsYBKOJRrufSDO+DtpnX7TJdugbhrvL7/xhQBBdw=
github.com/CortexFoundation/torrentfs v1.0.69-0.20250416125950-ceaf4dd388f2/go.mod h1:Lf7iaFiGPvExQiyPufsbQ8hs0XkKUunTLcp0r77/IaY=
github.com/CortexFoundation/wormhole v0.0.2-0.20241128010855-a23c88842cfa h1:46VAGWxOwpoLlPNcR9etAhK0NtT215skO9Wl4i14r4o=
github.com/CortexFoundation/wormhole v0.0.2-0.20241128010855-a23c88842cfa/go.mod h1:ipzmPabDgzYKUbXkGVe2gTkBEp+MsDx6pXGiuYzmP6s=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
Expand Down
2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ github.com/CortexFoundation/robot/backend
# github.com/CortexFoundation/statik v0.0.0-20210315012922-8bb8a7b5dc66
## explicit; go 1.16
github.com/CortexFoundation/statik
# github.com/CortexFoundation/torrentfs v1.0.69-0.20250415134051-0b782fe8b66d
# github.com/CortexFoundation/torrentfs v1.0.69-0.20250416125950-ceaf4dd388f2
## explicit; go 1.23.4
github.com/CortexFoundation/torrentfs
github.com/CortexFoundation/torrentfs/backend
Expand Down