Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ func (vlog *valueLog) rewrite(f *logFile) error {
vlog.opt.Debugf("Processing entry %d", count)
}

if isDeletedOrExpired(e.meta, e.ExpiresAt) {
return nil
}

vs, err := vlog.db.get(e.Key)
if err != nil {
return err
Expand Down Expand Up @@ -1037,9 +1041,6 @@ func discardEntry(e Entry, vs y.ValueStruct, db *DB) bool {
// Version not found. Discard.
return true
}
if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
return true
}
if (vs.Meta & bitValuePointer) == 0 {
// Key also stores the value in LSM. Discard.
return true
Expand Down
150 changes: 150 additions & 0 deletions value_gc_rewrite_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package badger
Comment thread
matthewmcneely marked this conversation as resolved.

import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
)

const (
benchmarkRewriteExpiredKeyCount = 8192
benchmarkRewriteLiveKeyCount = 4096
benchmarkRewriteValueSize = 2 << 10
benchmarkRewriteValueLogFileSize = 8 << 20
benchmarkRewriteValueThreshold = 128
benchmarkRewriteEntryExpiry uint64 = 1
)

func benchmarkValueGCRewriteOptions(dir string) Options {
opt := getTestOptions(dir)
opt.ValueLogFileSize = benchmarkRewriteValueLogFileSize
opt.BaseTableSize = 1 << 20
opt.BaseLevelSize = 4 << 20
opt.ValueThreshold = benchmarkRewriteValueThreshold
opt.NumCompactors = 0
opt.MetricsEnabled = false
return opt
}

func benchmarkWriteEntries(b *testing.B, db *DB, prefix string, count int, value []byte, expiresAt uint64) {
b.Helper()

for i := 0; i < count; {
txn := db.NewTransaction(true)
for ; i < count; i++ {
entry := NewEntry([]byte(fmt.Sprintf("%s-%08d", prefix, i)), value)
entry.ExpiresAt = expiresAt

err := txn.SetEntry(entry)
if err == ErrTxnTooBig {
require.NoError(b, txn.Commit())
txn = nil
break
}
require.NoError(b, err)
}
if txn != nil {
require.NoError(b, txn.Commit())
}
}
}

func benchmarkPrepareRewriteFixture(b *testing.B, dir string) {
b.Helper()

db, err := Open(benchmarkValueGCRewriteOptions(dir))
require.NoError(b, err)

expiredValue := bytes.Repeat([]byte("e"), benchmarkRewriteValueSize)
liveValue := bytes.Repeat([]byte("l"), benchmarkRewriteValueSize)

// Fill the earliest vlog files with expired entries before appending live entries.
benchmarkWriteEntries(b, db, "expired", benchmarkRewriteExpiredKeyCount, expiredValue, benchmarkRewriteEntryExpiry)
benchmarkWriteEntries(b, db, "live", benchmarkRewriteLiveKeyCount, liveValue, 0)

require.NoError(b, db.Close())
}

func copyDir(src, dst string) error {
return filepath.Walk(src, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
relPath, err := filepath.Rel(src, path)
if err != nil {
return err
}
targetPath := filepath.Join(dst, relPath)
if info.IsDir() {
return os.MkdirAll(targetPath, info.Mode())
}
if !info.Mode().IsRegular() {
return fmt.Errorf("unsupported file mode for %s", path)
}

srcFile, err := os.Open(path)
if err != nil {
return err
}

dstFile, err := os.OpenFile(targetPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, info.Mode())
if err != nil {
_ = srcFile.Close()
return err
}
if _, err := io.Copy(dstFile, srcFile); err != nil {
_ = srcFile.Close()
_ = dstFile.Close()
return err
}
if err := srcFile.Close(); err != nil {
_ = dstFile.Close()
return err
}
return dstFile.Close()
})
}

func BenchmarkValueGCRewriteExpiredOnlyFile(b *testing.B) {
rootDir := b.TempDir()
fixtureDir := filepath.Join(rootDir, "fixture")
benchmarkPrepareRewriteFixture(b, fixtureDir)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
b.StopTimer()

runDir := filepath.Join(rootDir, fmt.Sprintf("run-%03d", i))
require.NoError(b, copyDir(fixtureDir, runDir))

db, err := Open(benchmarkValueGCRewriteOptions(runDir))
require.NoError(b, err)

fids := db.vlog.sortedFids()
require.Greater(b, len(fids), 1)

db.vlog.filesLock.RLock()
lf := db.vlog.filesMap[fids[0]]
db.vlog.filesLock.RUnlock()

b.StartTimer()
err = db.vlog.rewrite(lf)
b.StopTimer()
require.NoError(b, err)

require.NoError(b, db.Close())
removeDir(runDir)
}
}
83 changes: 83 additions & 0 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ package badger
import (
"bytes"
"errors"
"expvar"
"fmt"
"math"
"math/rand"
"os"
"reflect"
"sync"
"testing"
"time"

humanize "github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -896,6 +898,87 @@ func (th *testHelper) writeRange(from, to int) {
}
}

func TestValueGCRewriteSkipsLSMGetOnlyForExpiredEntriesInMixedVlogFile(t *testing.T) {
dir := t.TempDir()

opt := getTestOptions(dir)
opt.ValueLogFileSize = 1 << 20
opt.BaseTableSize = 1 << 15
opt.ValueThreshold = 1 << 10

kv, err := Open(opt)
require.NoError(t, err)
defer kv.Close()

rng := rand.New(rand.NewSource(2))
const mixedEntryCount = 16
const mixedLiveEntryCount = mixedEntryCount / 2
const extraLiveEntryCount = 1
// Leave enough room for entry metadata so all mixed entries stay in one vlog file.
valueSize := int(opt.ValueLogFileSize/int64(mixedEntryCount)) - 512
values := make([][]byte, mixedEntryCount)
for i := range values {
values[i] = make([]byte, valueSize)
_, err = rng.Read(values[i])
require.NoError(t, err)
}
rotatingVal := make([]byte, 32<<10)
_, err = rng.Read(rotatingVal)
require.NoError(t, err)

// Keep several expired and live entries in the same old vlog file.
require.NoError(t, kv.Update(func(txn *Txn) error {
for i, val := range values {
key := []byte(fmt.Sprintf("mixed-%02d", i))
entry := NewEntry(key, val)
if i%2 == 0 {
entry = entry.WithTTL(-time.Hour)
}
if err := txn.SetEntry(entry); err != nil {
return err
}
}
return nil
}))
require.NoError(t, kv.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte("rotating"), rotatingVal))
}))

fids := kv.vlog.sortedFids()
require.Len(t, fids, 2)

kv.vlog.filesLock.RLock()
lf := kv.vlog.filesMap[fids[0]]
kv.vlog.filesLock.RUnlock()

// Only the live entry should require an LSM lookup during rewrite.
clearAllMetrics()
require.NoError(t, kv.vlog.rewrite(lf))

totalGets := expvar.Get("badger_get_num_user")
require.NotNil(t, totalGets)
require.Equal(t, int64(mixedLiveEntryCount+extraLiveEntryCount), totalGets.(*expvar.Int).Value())

require.NoError(t, kv.View(func(txn *Txn) error {
for i, want := range values {
key := []byte(fmt.Sprintf("mixed-%02d", i))
item, err := txn.Get(key)
if i%2 == 0 {
require.Equal(t, ErrKeyNotFound, err)
continue
}
require.NoError(t, err)
val := getItemValue(t, item)
require.Equal(t, want, val)
}
item, err := txn.Get([]byte("rotating"))
require.NoError(t, err)
val := getItemValue(t, item)
require.Equal(t, rotatingVal, val)
return nil
}))
}

func (th *testHelper) readRange(from, to int) {
for i := from; i <= to; i++ {
err := th.db.View(func(txn *Txn) error {
Expand Down
Loading