Skip to content

Commit 6aea736

Browse files
committed
db/kv, db/state, execution: opt-in page-cache warmup + parallel read-ahead
1 parent 9e2e4d0 commit 6aea736

24 files changed

Lines changed: 725 additions & 116 deletions

cmd/integration/commands/refetence_db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ var cmdMdbxToMdbx = &cobra.Command{
104104
ctx, _ := common.RootContext()
105105
logger := debug.SetupCobra(cmd, "integration")
106106
from, to := backup.OpenPair(chaindata, toChaindata, dbcfg.ChainDB, 0, logger)
107-
err := backup.Kv2kv(ctx, from, to, nil, backup.ReadAheadThreads, logger)
107+
err := backup.Kv2kv(ctx, from, to, nil, logger)
108108
if err != nil && !errors.Is(err, context.Canceled) {
109109
if !errors.Is(err, context.Canceled) {
110110
logger.Error(err.Error())

cmd/integration/commands/reset_state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ var cmdClearBadBlocks = &cobra.Command{
106106
defer db.Close()
107107

108108
return db.Update(ctx, func(tx kv.RwTx) error {
109-
return backup.ClearTables(ctx, tx, kv.BadHeaderNumber)
109+
return backup.ClearTables(ctx, db, tx, kv.BadHeaderNumber)
110110
})
111111
},
112112
}

cmd/integration/commands/stages.go

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,7 @@ func stageSenders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) er
546546

547547
br, _ := blocksIO(db, logger)
548548
if reset {
549-
return db.Update(ctx, func(tx kv.RwTx) error { return rawdbreset.ResetSenders(ctx, tx) })
549+
return db.Update(ctx, func(tx kv.RwTx) error { return rawdbreset.ResetSenders(ctx, db, tx) })
550550
}
551551

552552
tx, err := db.BeginRw(ctx)
@@ -710,7 +710,7 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error
710710
if err != nil {
711711
return err
712712
}
713-
err = stagedsync.PruneExecutionStage(ctx, p, tx, cfg, 0, logger)
713+
_, err = stagedsync.PruneExecutionStage(ctx, p, tx, cfg, 0, logger)
714714
if err != nil {
715715
return err
716716
}
@@ -758,10 +758,13 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error
758758
return err
759759
}
760760
}
761-
if err := doms.Commit(ctx, tx); err != nil {
761+
if err := doms.Flush(ctx, tx); err != nil {
762+
return err
763+
}
764+
doms.ClearRam(true)
765+
if err := tx.Commit(); err != nil {
762766
return err
763767
}
764-
doms.Close()
765768
if tx, err = db.BeginTemporalRw(ctx); err != nil {
766769
return err
767770
}
@@ -770,27 +773,22 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error
770773
if err != nil {
771774
return err
772775
}
773-
if err := stagedsync.PruneExecutionStage(ctx, pruneStage, tx, cfg, 0, logger); err != nil {
776+
if _, err := stagedsync.PruneExecutionStage(ctx, pruneStage, tx, cfg, 0, logger); err != nil {
774777
return err
775778
}
776779

777780
if err := tx.Commit(); err != nil {
778781
return err
779782
}
780-
if bn+1 >= block { // last block committed; nothing more to run
781-
break
782-
}
783783
if tx, err = db.BeginTemporalRw(ctx); err != nil {
784784
return err
785785
}
786-
// Fresh SD for the next block: a committed SD is never reused.
787-
if doms, err = execctx.NewSharedDomains(ctx, tx, logger); err != nil {
788-
return err
789-
}
790-
doms.SetInMemHistoryReads(false)
791786
}
792-
doms.Close()
793-
return nil
787+
if err := doms.Flush(ctx, tx); err != nil {
788+
return err
789+
}
790+
doms.ClearRam(true)
791+
return tx.Commit()
794792
}
795793
agg := (db.(dbstate.HasAgg).Agg()).(*dbstate.Aggregator)
796794
blockSnapBuildSema := semaphore.NewWeighted(int64(runtime.NumCPU()))
@@ -809,25 +807,44 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error
809807
if err := doms.Flush(ctx, tx); err != nil {
810808
return err
811809
}
810+
txNum := doms.TxNum()
812811
doms.ClearRam(true)
813-
814-
pruneStage, err := sync.PruneStageState(stages.Execution, s.BlockNumber, tx, s.CurrentSyncCycle.IsInitialCycle)
815-
if err != nil {
816-
return err
817-
}
818-
if err := stagedsync.PruneExecutionStage(ctx, pruneStage, tx, cfg, 0, logger); err != nil {
819-
return err
820-
}
821-
822812
if !noCommit {
823813
if err := tx.Commit(); err != nil {
824814
return err
825815
}
816+
817+
agg.BuildFilesInBackground(txNum)
818+
826819
if tx, err = db.BeginTemporalRw(ctx); err != nil {
827820
return err
828821
}
829822
}
830823

824+
// Prune in bounded, committed batches so a long prune survives Ctrl-C:
825+
// prune progress is durable, so a restart only repeats the last batch.
826+
for {
827+
pruneStage, err := sync.PruneStageState(stages.Execution, s.BlockNumber, tx, false)
828+
if err != nil {
829+
return err
830+
}
831+
hasMore, err := stagedsync.PruneExecutionStage(ctx, pruneStage, tx, cfg, time.Minute, logger)
832+
if err != nil {
833+
return err
834+
}
835+
if !noCommit {
836+
if err := tx.Commit(); err != nil {
837+
return err
838+
}
839+
if tx, err = db.BeginTemporalRw(ctx); err != nil {
840+
return err
841+
}
842+
}
843+
if !hasMore {
844+
break
845+
}
846+
}
847+
831848
if execProgress, err = stages.GetStageProgress(tx, stages.Execution); err != nil {
832849
return err
833850
}

common/dbg/experiments.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ import (
3838
var (
3939
MaxReorgDepth = EnvUint("MAX_REORG_DEPTH", 96)
4040

41+
// WarmupTableWorkers bounds the parallel scans of a single WarmupTable call;
42+
// 0 (the default) disables table warmup entirely.
43+
WarmupTableWorkers = EnvUint("WARMUP_TABLE_WORKERS", 0)
44+
4145
saveHeapProfile = EnvBool("SAVE_HEAP_PROFILE", false)
4246
heapProfileFilePath = EnvString("HEAP_PROFILE_FILE_PATH", "")
4347
heapProfileThresholdPercent = EnvUint("HEAP_PROFILE_THRESHOLD", 35)

db/kv/backup/backup.go

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ package backup
1818

1919
import (
2020
"context"
21-
"encoding/hex"
2221
"fmt"
2322
"maps"
2423
"runtime"
24+
"slices"
2525
"time"
2626

2727
"github.com/c2h5oh/datasize"
@@ -58,7 +58,7 @@ func OpenPair(from, to string, label kv.Label, targetPageSize datasize.ByteSize,
5858
return src, dst
5959
}
6060

61-
func Kv2kv(ctx context.Context, src kv.RoDB, dst kv.RwDB, tables []string, readAheadThreads int, logger log.Logger) error {
61+
func Kv2kv(ctx context.Context, src kv.RoDB, dst kv.RwDB, tables []string, logger log.Logger) error {
6262
srcTx, err1 := src.BeginRo(ctx)
6363
if err1 != nil {
6464
return err1
@@ -79,76 +79,122 @@ func Kv2kv(ctx context.Context, src kv.RoDB, dst kv.RwDB, tables []string, readA
7979
}
8080
}
8181

82-
for name, b := range tablesMap {
83-
if b.IsDeprecated {
82+
var copiedTables int
83+
var copiedRows uint64
84+
for _, name := range slices.Sorted(maps.Keys(tablesMap)) { // deterministic order for reproducible benchmarks
85+
if tablesMap[name].IsDeprecated {
8486
continue
8587
}
86-
if err := backupTable(ctx, srcTx, dst, name, logEvery, logger); err != nil {
88+
rows, err := backupTable(ctx, src, srcTx, dst, name, logEvery, logger)
89+
if err != nil {
8790
return err
8891
}
92+
if rows > 0 {
93+
copiedTables++
94+
copiedRows += rows
95+
}
8996
}
90-
logger.Info("done")
97+
logger.Info("done", "tablesWithData", copiedTables, "rows", common.PrettyCounter(copiedRows))
9198
return nil
9299
}
93100

94-
func backupTable(ctx context.Context, srcTx kv.Tx, dst kv.RwDB, table string, logEvery *time.Ticker, logger log.Logger) error {
95-
var total uint64
101+
// warmupWhile warms `table` into RAM in the background while `fn` runs, and
102+
// cancels the warmup as soon as `fn` returns — bounds warmup to one table at a
103+
// time and keeps fn's reads off disk on chaindata >> RAM.
104+
func warmupWhile(ctx context.Context, db kv.RoDB, table string, fn func() error) error {
105+
warmupCtx, cancel := context.WithCancel(ctx)
106+
done := make(chan struct{})
107+
go func() {
108+
defer close(done)
109+
db.WarmupTable(warmupCtx, table)
110+
}()
111+
defer func() {
112+
cancel()
113+
<-done
114+
}()
115+
return fn()
116+
}
117+
118+
func backupTable(ctx context.Context, src kv.RoDB, srcTx kv.Tx, dst kv.RwDB, table string, logEvery *time.Ticker, logger log.Logger) (uint64, error) {
119+
t := time.Now()
96120
srcC, err := srcTx.Cursor(table)
97121
if err != nil {
98-
return err
122+
return 0, err
99123
}
100124
defer srcC.Close()
101-
total, _ = srcTx.Count(table)
125+
total, _ := srcTx.Count(table)
126+
size, _ := srcTx.BucketSize(table)
127+
if total > 0 {
128+
logger.Info("[mdbx_to_mdbx] copying", "table", table, "rows", common.PrettyCounter(total), "size", common.ByteCount(size))
129+
}
130+
131+
// Parallel read-ahead: keep a bounded band of pages warm just ahead of the
132+
// copy cursor (cold page faults are slow; one reader can't saturate nvme).
133+
// No-op unless WARMUP_TABLE_WORKERS is set.
134+
var ra *kv.ReadAheader
135+
if workers := int(dbg.WarmupTableWorkers); workers > 0 && total > 0 {
136+
ra = kv.NewReadAheader(ctx, src, table, nil, workers)
137+
}
138+
defer ra.Close()
102139

103140
if err := dst.Update(ctx, func(tx kv.RwTx) error {
104141
return tx.ClearTable(table)
105142
}); err != nil {
106-
return err
143+
return 0, err
107144
}
108145
dstTx, err := dst.BeginRw(ctx)
109146
if err != nil {
110-
return err
147+
return 0, err
111148
}
112149
defer dstTx.Rollback()
113150

114151
c, err := dstTx.RwCursor(table)
115152
if err != nil {
116-
return err
153+
return 0, err
117154
}
118155
defer c.Close()
119156
casted, isDupsort := c.(kv.RwCursorDupSort)
120157
i := uint64(0)
121158

122159
for k, v, err := srcC.First(); k != nil; k, v, err = srcC.Next() {
123160
if err != nil {
124-
return err
161+
return 0, err
162+
}
163+
if i%1024 == 0 {
164+
ra.SetPos(k)
125165
}
126166

127167
if isDupsort {
128168
if err = casted.AppendDup(k, v); err != nil {
129-
return err
169+
return 0, err
130170
}
131171
} else {
132172
if err = c.Append(k, v); err != nil {
133-
return err
173+
return 0, err
134174
}
135175
}
136176

137177
i++
138178
if i%100_000 == 0 {
139179
select {
140180
case <-ctx.Done():
141-
return ctx.Err()
181+
return 0, ctx.Err()
142182
case <-logEvery.C:
143183
var m runtime.MemStats
144184
dbg.ReadMemStats(&m)
145185
logger.Info("Progress", "table", table, "progress",
146-
fmt.Sprintf("%s/%s", common.PrettyCounter(i), common.PrettyCounter(total)), "key", hex.EncodeToString(k),
186+
fmt.Sprintf("%s/%s", common.PrettyCounter(i), common.PrettyCounter(total)),
187+
"size", common.ByteCount(size), "keys/s", uint64(float64(i)/time.Since(t).Seconds()),
147188
"alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys))
148189
default:
149190
}
150191
}
151192
}
193+
194+
// TODO: Unwind doesn't need to decrement the auto-increment sequence — it's
195+
// not exposed to users and not part of consensus, so we could switch to
196+
// mdbx's native Sequence.
197+
152198
// migrate bucket sequences to native mdbx implementation
153199
//currentID, err := srcTx.Sequence(name, 0)
154200
//if err != nil {
@@ -159,17 +205,17 @@ func backupTable(ctx context.Context, srcTx kv.Tx, dst kv.RwDB, table string, lo
159205
// return err
160206
//}
161207
if err2 := dstTx.Commit(); err2 != nil {
162-
return err2
208+
return 0, err2
163209
}
164-
return nil
210+
return i, nil
165211
}
166212

167-
const ReadAheadThreads = 2048
168-
169-
func ClearTables(ctx context.Context, tx kv.RwTx, tables ...string) error {
213+
func ClearTables(ctx context.Context, db kv.RoDB, tx kv.RwTx, tables ...string) error {
170214
for _, tbl := range tables {
171215
log.Info("Clear", "table", tbl)
172-
if err := tx.ClearTable(tbl); err != nil {
216+
if err := warmupWhile(ctx, db, tbl, func() error {
217+
return tx.ClearTable(tbl)
218+
}); err != nil {
173219
return err
174220
}
175221
}

db/kv/helpers.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ func (g *gatedRoDB) AllTables() TableCfg { return g.inner.Al
6262
func (g *gatedRoDB) PageSize() datasize.ByteSize { return g.inner.PageSize() }
6363
func (g *gatedRoDB) CHandle() unsafe.Pointer { return g.inner.CHandle() }
6464
func (g *gatedRoDB) Path() string { return g.inner.Path() }
65+
66+
// WarmupTable is a no-op: warmup spawns long-lived read scans, exactly the
67+
// freelist-pinning readers the gate exists to keep off the commit path.
68+
func (g *gatedRoDB) WarmupTable(ctx context.Context, table string) {}
6569
func (g *gatedRoDB) View(ctx context.Context, f func(tx Tx) error) error {
6670
g.gate.RLock()
6771
defer g.gate.RUnlock()

db/kv/kv_interface.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ type RoDB interface {
122122
AllTables() TableCfg
123123
PageSize() datasize.ByteSize
124124

125+
// WarmupTable pulls a single table into the OS page-cache via parallel scans.
126+
WarmupTable(ctx context.Context, table string)
127+
125128
// CHandle pointer to the underlying C environment handle, if applicable (e.g. *C.MDBX_env)
126129
CHandle() unsafe.Pointer
127130
Path() string

0 commit comments

Comments
 (0)