diff --git a/value.go b/value.go index 77a023265..06620b23f 100644 --- a/value.go +++ b/value.go @@ -183,6 +183,10 @@ func (vlog *valueLog) rewrite(f *logFile) error { vlog.opt.Debugf("Processing entry %d", count) } + if isDeletedOrExpired(e.meta, e.ExpiresAt) { + return nil + } + vs, err := vlog.db.get(e.Key) if err != nil { return err @@ -1037,9 +1041,6 @@ func discardEntry(e Entry, vs y.ValueStruct, db *DB) bool { // Version not found. Discard. return true } - if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) { - return true - } if (vs.Meta & bitValuePointer) == 0 { // Key also stores the value in LSM. Discard. return true diff --git a/value_gc_rewrite_bench_test.go b/value_gc_rewrite_bench_test.go new file mode 100644 index 000000000..c3095fc66 --- /dev/null +++ b/value_gc_rewrite_bench_test.go @@ -0,0 +1,150 @@ +/* + * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package badger + +import ( + "bytes" + "fmt" + "io" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +const ( + benchmarkRewriteExpiredKeyCount = 8192 + benchmarkRewriteLiveKeyCount = 4096 + benchmarkRewriteValueSize = 2 << 10 + benchmarkRewriteValueLogFileSize = 8 << 20 + benchmarkRewriteValueThreshold = 128 + benchmarkRewriteEntryExpiry uint64 = 1 +) + +func benchmarkValueGCRewriteOptions(dir string) Options { + opt := getTestOptions(dir) + opt.ValueLogFileSize = benchmarkRewriteValueLogFileSize + opt.BaseTableSize = 1 << 20 + opt.BaseLevelSize = 4 << 20 + opt.ValueThreshold = benchmarkRewriteValueThreshold + opt.NumCompactors = 0 + opt.MetricsEnabled = false + return opt +} + +func benchmarkWriteEntries(b *testing.B, db *DB, prefix string, count int, value []byte, expiresAt uint64) { + b.Helper() + + for i := 0; i < count; { + txn := db.NewTransaction(true) + for ; i < count; i++ { + entry := NewEntry([]byte(fmt.Sprintf("%s-%08d", prefix, i)), value) + entry.ExpiresAt = expiresAt + + err := txn.SetEntry(entry) + if err == ErrTxnTooBig { + require.NoError(b, txn.Commit()) + txn = nil + break + } + require.NoError(b, err) + } + if txn != nil { + require.NoError(b, txn.Commit()) + } + } +} + +func benchmarkPrepareRewriteFixture(b *testing.B, dir string) { + b.Helper() + + db, err := Open(benchmarkValueGCRewriteOptions(dir)) + require.NoError(b, err) + + expiredValue := bytes.Repeat([]byte("e"), benchmarkRewriteValueSize) + liveValue := bytes.Repeat([]byte("l"), benchmarkRewriteValueSize) + + // Fill the earliest vlog files with expired entries before appending live entries. + benchmarkWriteEntries(b, db, "expired", benchmarkRewriteExpiredKeyCount, expiredValue, benchmarkRewriteEntryExpiry) + benchmarkWriteEntries(b, db, "live", benchmarkRewriteLiveKeyCount, liveValue, 0) + + require.NoError(b, db.Close()) +} + +func copyDir(src, dst string) error { + return filepath.Walk(src, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + relPath, err := filepath.Rel(src, path) + if err != nil { + return err + } + targetPath := filepath.Join(dst, relPath) + if info.IsDir() { + return os.MkdirAll(targetPath, info.Mode()) + } + if !info.Mode().IsRegular() { + return fmt.Errorf("unsupported file mode for %s", path) + } + + srcFile, err := os.Open(path) + if err != nil { + return err + } + + dstFile, err := os.OpenFile(targetPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, info.Mode()) + if err != nil { + _ = srcFile.Close() + return err + } + if _, err := io.Copy(dstFile, srcFile); err != nil { + _ = srcFile.Close() + _ = dstFile.Close() + return err + } + if err := srcFile.Close(); err != nil { + _ = dstFile.Close() + return err + } + return dstFile.Close() + }) +} + +func BenchmarkValueGCRewriteExpiredOnlyFile(b *testing.B) { + rootDir := b.TempDir() + fixtureDir := filepath.Join(rootDir, "fixture") + benchmarkPrepareRewriteFixture(b, fixtureDir) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + + runDir := filepath.Join(rootDir, fmt.Sprintf("run-%03d", i)) + require.NoError(b, copyDir(fixtureDir, runDir)) + + db, err := Open(benchmarkValueGCRewriteOptions(runDir)) + require.NoError(b, err) + + fids := db.vlog.sortedFids() + require.Greater(b, len(fids), 1) + + db.vlog.filesLock.RLock() + lf := db.vlog.filesMap[fids[0]] + db.vlog.filesLock.RUnlock() + + b.StartTimer() + err = db.vlog.rewrite(lf) + b.StopTimer() + require.NoError(b, err) + + require.NoError(b, db.Close()) + removeDir(runDir) + } +} diff --git a/value_test.go b/value_test.go index 2e7065644..374bce8f8 100644 --- a/value_test.go +++ b/value_test.go @@ -8,6 +8,7 @@ package badger import ( "bytes" "errors" + "expvar" "fmt" "math" "math/rand" @@ -15,6 +16,7 @@ import ( "reflect" "sync" "testing" + "time" humanize "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" @@ -896,6 +898,87 @@ func (th *testHelper) writeRange(from, to int) { } } +func TestValueGCRewriteSkipsLSMGetOnlyForExpiredEntriesInMixedVlogFile(t *testing.T) { + dir := t.TempDir() + + opt := getTestOptions(dir) + opt.ValueLogFileSize = 1 << 20 + opt.BaseTableSize = 1 << 15 + opt.ValueThreshold = 1 << 10 + + kv, err := Open(opt) + require.NoError(t, err) + defer kv.Close() + + rng := rand.New(rand.NewSource(2)) + const mixedEntryCount = 16 + const mixedLiveEntryCount = mixedEntryCount / 2 + const extraLiveEntryCount = 1 + // Leave enough room for entry metadata so all mixed entries stay in one vlog file. + valueSize := int(opt.ValueLogFileSize/int64(mixedEntryCount)) - 512 + values := make([][]byte, mixedEntryCount) + for i := range values { + values[i] = make([]byte, valueSize) + _, err = rng.Read(values[i]) + require.NoError(t, err) + } + rotatingVal := make([]byte, 32<<10) + _, err = rng.Read(rotatingVal) + require.NoError(t, err) + + // Keep several expired and live entries in the same old vlog file. + require.NoError(t, kv.Update(func(txn *Txn) error { + for i, val := range values { + key := []byte(fmt.Sprintf("mixed-%02d", i)) + entry := NewEntry(key, val) + if i%2 == 0 { + entry = entry.WithTTL(-time.Hour) + } + if err := txn.SetEntry(entry); err != nil { + return err + } + } + return nil + })) + require.NoError(t, kv.Update(func(txn *Txn) error { + return txn.SetEntry(NewEntry([]byte("rotating"), rotatingVal)) + })) + + fids := kv.vlog.sortedFids() + require.Len(t, fids, 2) + + kv.vlog.filesLock.RLock() + lf := kv.vlog.filesMap[fids[0]] + kv.vlog.filesLock.RUnlock() + + // Only the live entry should require an LSM lookup during rewrite. + clearAllMetrics() + require.NoError(t, kv.vlog.rewrite(lf)) + + totalGets := expvar.Get("badger_get_num_user") + require.NotNil(t, totalGets) + require.Equal(t, int64(mixedLiveEntryCount+extraLiveEntryCount), totalGets.(*expvar.Int).Value()) + + require.NoError(t, kv.View(func(txn *Txn) error { + for i, want := range values { + key := []byte(fmt.Sprintf("mixed-%02d", i)) + item, err := txn.Get(key) + if i%2 == 0 { + require.Equal(t, ErrKeyNotFound, err) + continue + } + require.NoError(t, err) + val := getItemValue(t, item) + require.Equal(t, want, val) + } + item, err := txn.Get([]byte("rotating")) + require.NoError(t, err) + val := getItemValue(t, item) + require.Equal(t, rotatingVal, val) + return nil + })) +} + func (th *testHelper) readRange(from, to int) { for i := from; i <= to; i++ { err := th.db.View(func(txn *Txn) error {