Skip to content

Commit 6bf1e15

Browse files
committed
db/kv, db/state, execution: opt-in per-table page-cache warmup
Adds a per-table warmup primitive to help operate chaindata >> RAM: - kv.RoDB.WarmupTable(ctx, table) and kv.TemporalRoDB.Warmup(ctx, force); mdbx impl scans the table with parallel prefix reads, gated by dbg.WarmupTableWorkers (env WARMUP_TABLE_WORKERS, default 0 = disabled). gatedRoDB.WarmupTable is a no-op: warmup's long-lived readers are exactly what the gate keeps off the commit path. - domain/inverted-index collate+prune fire a fire-and-forget WarmupTable on the table they are about to read (skipped when the Aggregator-set db is nil, e.g. standalone test instances). - backup.ClearTables and Kv2kv warm each table while clearing/copying it via a warmupWhile helper that cancels the warmup as soon as the operation returns. ClearTables/ResetSenders gain a db arg; backup.ReadAheadThreads is replaced by dbg.WarmupTableWorkers.
1 parent 6e19e6e commit 6bf1e15

19 files changed

Lines changed: 216 additions & 21 deletions

File tree

cmd/integration/commands/refetence_db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ var cmdMdbxToMdbx = &cobra.Command{
104104
ctx, _ := common.RootContext()
105105
logger := debug.SetupCobra(cmd, "integration")
106106
from, to := backup.OpenPair(chaindata, toChaindata, dbcfg.ChainDB, 0, logger)
107-
err := backup.Kv2kv(ctx, from, to, nil, backup.ReadAheadThreads, logger)
107+
err := backup.Kv2kv(ctx, from, to, nil, logger)
108108
if err != nil && !errors.Is(err, context.Canceled) {
109109
if !errors.Is(err, context.Canceled) {
110110
logger.Error(err.Error())

cmd/integration/commands/reset_state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ var cmdClearBadBlocks = &cobra.Command{
106106
defer db.Close()
107107

108108
return db.Update(ctx, func(tx kv.RwTx) error {
109-
return backup.ClearTables(ctx, tx, kv.BadHeaderNumber)
109+
return backup.ClearTables(ctx, db, tx, kv.BadHeaderNumber)
110110
})
111111
},
112112
}

cmd/integration/commands/stages.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,7 @@ func stageSenders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) er
546546

547547
br, _ := blocksIO(db, logger)
548548
if reset {
549-
return db.Update(ctx, func(tx kv.RwTx) error { return rawdbreset.ResetSenders(ctx, tx) })
549+
return db.Update(ctx, func(tx kv.RwTx) error { return rawdbreset.ResetSenders(ctx, db, tx) })
550550
}
551551

552552
tx, err := db.BeginRw(ctx)
@@ -810,6 +810,20 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error
810810
return err
811811
}
812812
doms.ClearRam(true)
813+
if !noCommit {
814+
if err := tx.Commit(); err != nil {
815+
return err
816+
}
817+
818+
// no-async
819+
if err := agg.BuildFiles(agg.EndTxNumMinimax() + agg.StepSize()); err != nil {
820+
return err
821+
}
822+
823+
if tx, err = db.BeginTemporalRw(ctx); err != nil {
824+
return err
825+
}
826+
}
813827

814828
pruneStage, err := sync.PruneStageState(stages.Execution, s.BlockNumber, tx, s.CurrentSyncCycle.IsInitialCycle)
815829
if err != nil {

common/dbg/experiments.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ import (
3838
var (
3939
MaxReorgDepth = EnvUint("MAX_REORG_DEPTH", 96)
4040

41+
// WarmupTableWorkers bounds the parallel scans of a single WarmupTable call;
42+
// 0 (the default) disables table warmup entirely.
43+
WarmupTableWorkers = EnvUint("WARMUP_TABLE_WORKERS", 0)
44+
4145
saveHeapProfile = EnvBool("SAVE_HEAP_PROFILE", false)
4246
heapProfileFilePath = EnvString("HEAP_PROFILE_FILE_PATH", "")
4347
heapProfileThresholdPercent = EnvUint("HEAP_PROFILE_THRESHOLD", 35)

db/kv/backup/backup.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func OpenPair(from, to string, label kv.Label, targetPageSize datasize.ByteSize,
5858
return src, dst
5959
}
6060

61-
func Kv2kv(ctx context.Context, src kv.RoDB, dst kv.RwDB, tables []string, readAheadThreads int, logger log.Logger) error {
61+
func Kv2kv(ctx context.Context, src kv.RoDB, dst kv.RwDB, tables []string, logger log.Logger) error {
6262
srcTx, err1 := src.BeginRo(ctx)
6363
if err1 != nil {
6464
return err1
@@ -83,14 +83,33 @@ func Kv2kv(ctx context.Context, src kv.RoDB, dst kv.RwDB, tables []string, readA
8383
if b.IsDeprecated {
8484
continue
8585
}
86-
if err := backupTable(ctx, srcTx, dst, name, logEvery, logger); err != nil {
86+
if err := warmupWhile(ctx, src, name, func() error {
87+
return backupTable(ctx, srcTx, dst, name, logEvery, logger)
88+
}); err != nil {
8789
return err
8890
}
8991
}
9092
logger.Info("done")
9193
return nil
9294
}
9395

96+
// warmupWhile warms `table` into RAM in the background while `fn` runs, and
97+
// cancels the warmup as soon as `fn` returns — bounds warmup to one table at a
98+
// time and keeps fn's reads off disk on chaindata >> RAM.
99+
func warmupWhile(ctx context.Context, db kv.RoDB, table string, fn func() error) error {
100+
warmupCtx, cancel := context.WithCancel(ctx)
101+
done := make(chan struct{})
102+
go func() {
103+
defer close(done)
104+
db.WarmupTable(warmupCtx, table)
105+
}()
106+
defer func() {
107+
cancel()
108+
<-done
109+
}()
110+
return fn()
111+
}
112+
94113
func backupTable(ctx context.Context, srcTx kv.Tx, dst kv.RwDB, table string, logEvery *time.Ticker, logger log.Logger) error {
95114
var total uint64
96115
srcC, err := srcTx.Cursor(table)
@@ -149,6 +168,11 @@ func backupTable(ctx context.Context, srcTx kv.Tx, dst kv.RwDB, table string, lo
149168
}
150169
}
151170
}
171+
172+
// TODO: Unwind doesn't need to decrement the auto-increment sequence — it's
173+
// not exposed to users and not part of consensus, so we could switch to
174+
// mdbx's native Sequence.
175+
152176
// migrate bucket sequences to native mdbx implementation
153177
//currentID, err := srcTx.Sequence(name, 0)
154178
//if err != nil {
@@ -164,12 +188,10 @@ func backupTable(ctx context.Context, srcTx kv.Tx, dst kv.RwDB, table string, lo
164188
return nil
165189
}
166190

167-
const ReadAheadThreads = 2048
168-
169-
func ClearTables(ctx context.Context, tx kv.RwTx, tables ...string) error {
191+
func ClearTables(ctx context.Context, db kv.RoDB, tx kv.RwTx, tables ...string) error {
170192
for _, tbl := range tables {
171193
log.Info("Clear", "table", tbl)
172-
if err := tx.ClearTable(tbl); err != nil {
194+
if err := warmupWhile(ctx, db, tbl, func() error { return tx.ClearTable(tbl) }); err != nil {
173195
return err
174196
}
175197
}

db/kv/helpers.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ func (g *gatedRoDB) AllTables() TableCfg { return g.inner.Al
6262
func (g *gatedRoDB) PageSize() datasize.ByteSize { return g.inner.PageSize() }
6363
func (g *gatedRoDB) CHandle() unsafe.Pointer { return g.inner.CHandle() }
6464
func (g *gatedRoDB) Path() string { return g.inner.Path() }
65+
66+
// WarmupTable is a no-op: warmup spawns long-lived read scans, exactly the
67+
// freelist-pinning readers the gate exists to keep off the commit path.
68+
func (g *gatedRoDB) WarmupTable(ctx context.Context, table string) {}
6569
func (g *gatedRoDB) View(ctx context.Context, f func(tx Tx) error) error {
6670
g.gate.RLock()
6771
defer g.gate.RUnlock()

db/kv/kv_interface.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ type RoDB interface {
122122
AllTables() TableCfg
123123
PageSize() datasize.ByteSize
124124

125+
// WarmupTable pulls a single table into the OS page-cache via parallel scans.
126+
WarmupTable(ctx context.Context, table string)
127+
125128
// CHandle pointer to the underlying C environment handle, if applicable (e.g. *C.MDBX_env)
126129
CHandle() unsafe.Pointer
127130
Path() string
@@ -582,6 +585,10 @@ type TemporalRoDB interface {
582585
ViewTemporal(ctx context.Context, f func(tx TemporalTx) error) error
583586
BeginTemporalRo(ctx context.Context) (TemporalTx, error)
584587
Debug() TemporalDebugDB
588+
589+
// Warmup pulls the whole DB into RAM (mdbx env warmup). force re-touches
590+
// pages already resident; non-force only prefetches missing ones.
591+
Warmup(ctx context.Context, force bool) error
585592
}
586593
type TemporalRwDB interface {
587594
RwDB

db/kv/mdbx/kv_mdbx.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/c2h5oh/datasize"
3737
"github.com/erigontech/mdbx-go/mdbx"
3838
stack2 "github.com/go-stack/stack"
39+
"golang.org/x/sync/errgroup"
3940
"golang.org/x/sync/semaphore"
4041

4142
"github.com/erigontech/erigon/common/dbg"
@@ -882,6 +883,117 @@ func (db *MdbxKV) View(ctx context.Context, f func(tx kv.Tx) error) (err error)
882883
return f(tx)
883884
}
884885

886+
func (db *MdbxKV) WarmupTable(ctx context.Context, bucket string) {
887+
workers := int(dbg.WarmupTableWorkers)
888+
if workers <= 0 {
889+
return
890+
}
891+
const lvl = log.LvlInfo
892+
var total uint64
893+
if err := db.View(ctx, func(tx kv.Tx) error {
894+
var err error
895+
total, err = tx.Count(bucket)
896+
return err
897+
}); err != nil {
898+
db.log.Warn("[warmup] count failed", "table", bucket, "err", err)
899+
}
900+
if total == 0 {
901+
return
902+
}
903+
progress := atomic.Int64{}
904+
905+
logEvery := time.NewTicker(20 * time.Second)
906+
defer logEvery.Stop()
907+
908+
g, ctx := errgroup.WithContext(ctx)
909+
g.SetLimit(workers)
910+
for i := 0; i < 256; i++ {
911+
for j := 0; j < 256; j++ {
912+
i := i
913+
j := j
914+
g.Go(func() error {
915+
return db.View(ctx, func(tx kv.Tx) error {
916+
it, err := tx.Prefix(bucket, []byte{byte(i), byte(j)})
917+
if err != nil {
918+
return err
919+
}
920+
defer it.Close()
921+
kNum := 0
922+
for it.HasNext() {
923+
k, v, err := it.Next()
924+
if err != nil {
925+
return err
926+
}
927+
if len(k) > 0 {
928+
_, _ = k[0], k[len(k)-1]
929+
}
930+
if len(v) > 0 {
931+
_, _ = v[0], v[len(v)-1]
932+
}
933+
progress.Add(1)
934+
935+
kNum++
936+
if kNum%1024 == 0 { // a bit reduce runtime cost
937+
select {
938+
case <-ctx.Done():
939+
return ctx.Err()
940+
case <-logEvery.C:
941+
db.log.Log(lvl, fmt.Sprintf("[warmup] Progress: %s %.2f%%", bucket, min(100, 100*float64(progress.Load())/float64(total))))
942+
default:
943+
}
944+
}
945+
}
946+
return nil
947+
})
948+
})
949+
}
950+
}
951+
for i := 0; i < 1_000; i++ {
952+
i := i
953+
g.Go(func() error {
954+
return db.View(ctx, func(tx kv.Tx) error {
955+
seek := make([]byte, 8)
956+
binary.BigEndian.PutUint64(seek, uint64(i*100_000))
957+
it, err := tx.Prefix(bucket, seek)
958+
if err != nil {
959+
return err
960+
}
961+
defer it.Close()
962+
kNum := 0
963+
for it.HasNext() {
964+
k, v, err := it.Next()
965+
if err != nil {
966+
return err
967+
}
968+
if len(k) > 0 {
969+
_, _ = k[0], k[len(k)-1]
970+
}
971+
if len(v) > 0 {
972+
_, _ = v[0], v[len(v)-1]
973+
}
974+
progress.Add(1)
975+
976+
kNum++
977+
if kNum%1024 == 0 {
978+
select {
979+
case <-ctx.Done():
980+
return ctx.Err()
981+
case <-logEvery.C:
982+
db.log.Log(lvl, fmt.Sprintf("[warmup] Progress: %s %.2f%%", bucket, min(100, 100*float64(progress.Load())/float64(total))))
983+
default:
984+
}
985+
}
986+
}
987+
988+
return nil
989+
})
990+
})
991+
}
992+
if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
993+
db.log.Warn("[warmup] failed", "table", bucket, "err", err)
994+
}
995+
}
996+
885997
func (tx *MdbxTx) Apply(_ context.Context, f func(tx kv.Tx) error) (err error) {
886998
return f(tx)
887999
}

db/kv/membatchwithdb/memory_mutation.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,4 +1272,8 @@ func (td temporaldb) ViewTemporal(ctx context.Context, f func(tx kv.TemporalTx)
12721272
return f(td.memoryMutation)
12731273
}
12741274

1275+
func (td temporaldb) Warmup(ctx context.Context, force bool) error { return nil }
1276+
1277+
func (td temporaldb) WarmupTable(ctx context.Context, table string) {}
1278+
12751279
func (td temporaldb) Path() string { return "<mem>" }

db/kv/membatchwithdb/memory_store.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -431,12 +431,13 @@ func (db *memStoreDB) UpdateNosync(_ context.Context, f func(tx kv.RwTx) error)
431431
func (db *memStoreDB) View(_ context.Context, f func(tx kv.Tx) error) error {
432432
return f(db.store)
433433
}
434-
func (db *memStoreDB) Close() {}
435-
func (db *memStoreDB) AllTables() kv.TableCfg { return kv.ChaindataTablesCfg }
436-
func (db *memStoreDB) PageSize() datasize.ByteSize { return 4096 }
437-
func (db *memStoreDB) ReadOnly() bool { return false }
438-
func (db *memStoreDB) CHandle() unsafe.Pointer { return nil }
439-
func (db *memStoreDB) Path() string { return "<mem>" }
434+
func (db *memStoreDB) Close() {}
435+
func (db *memStoreDB) AllTables() kv.TableCfg { return kv.ChaindataTablesCfg }
436+
func (db *memStoreDB) PageSize() datasize.ByteSize { return 4096 }
437+
func (db *memStoreDB) WarmupTable(ctx context.Context, table string) {}
438+
func (db *memStoreDB) ReadOnly() bool { return false }
439+
func (db *memStoreDB) CHandle() unsafe.Pointer { return nil }
440+
func (db *memStoreDB) Path() string { return "<mem>" }
440441

441442
// --- Cursor ---
442443
//

0 commit comments

Comments
 (0)