Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/crlib/testutils/leaktest"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/batchrepr"
Expand All @@ -32,6 +33,7 @@ import (
)

func TestBatch(t *testing.T) {
defer leaktest.AfterTest(t)()
testBatch(t, 0)
testBatch(t, defaultBatchInitialSize)
}
Expand Down Expand Up @@ -202,6 +204,7 @@ func testBatch(t *testing.T, size int) {
}

func TestBatchPreAlloc(t *testing.T) {
defer leaktest.AfterTest(t)()
var cases = []struct {
size int
exp int
Expand All @@ -220,6 +223,7 @@ func TestBatchPreAlloc(t *testing.T) {
}

func TestBatchIngestSST(t *testing.T) {
defer leaktest.AfterTest(t)()
// Verify that Batch.IngestSST has the correct batch count and memtable
// size.
var b Batch
Expand All @@ -232,6 +236,7 @@ func TestBatchIngestSST(t *testing.T) {
}

func TestBatchLen(t *testing.T) {
defer leaktest.AfterTest(t)()
var b Batch

requireLenAndReprEq := func(size int) {
Expand All @@ -256,6 +261,7 @@ func TestBatchLen(t *testing.T) {
}

func TestBatchEmpty(t *testing.T) {
defer leaktest.AfterTest(t)()
testBatchEmpty(t, 0)
testBatchEmpty(t, defaultBatchInitialSize)
testBatchEmpty(t, 0, WithInitialSizeBytes(2<<10), WithMaxRetainedSizeBytes(2<<20))
Expand Down Expand Up @@ -323,6 +329,7 @@ func testBatchEmpty(t *testing.T, size int, opts ...BatchOption) {
}

func TestBatchApplyNoSyncWait(t *testing.T) {
defer leaktest.AfterTest(t)()
db, err := Open("", &Options{
FS: vfs.NewMem(),
Logger: testutils.Logger{T: t},
Expand Down Expand Up @@ -350,6 +357,7 @@ func TestBatchApplyNoSyncWait(t *testing.T) {
}

func TestBatchReset(t *testing.T) {
defer leaktest.AfterTest(t)()
db, err := Open("", &Options{
FS: vfs.NewMem(),
Logger: testutils.Logger{T: t},
Expand Down Expand Up @@ -422,11 +430,13 @@ func TestBatchReset(t *testing.T) {
}

func TestBatchReuse(t *testing.T) {
defer leaktest.AfterTest(t)()
db, err := Open("", &Options{
FS: vfs.NewMem(),
Logger: testutils.Logger{T: t},
})
require.NoError(t, err)
defer db.Close()

var buf bytes.Buffer
batches := map[string]*Batch{}
Expand Down Expand Up @@ -504,6 +514,7 @@ func TestBatchReuse(t *testing.T) {
}

func TestIndexedBatchReset(t *testing.T) {
defer leaktest.AfterTest(t)()
indexCount := func(sl *batchskl.Skiplist) int {
count := 0
iter := sl.NewIter(nil, nil)
Expand Down Expand Up @@ -597,6 +608,7 @@ func TestIndexedBatchReset(t *testing.T) {
// TestIndexedBatchMutation tests mutating an indexed batch with an open
// iterator.
func TestIndexedBatchMutation(t *testing.T) {
defer leaktest.AfterTest(t)()
opts := &Options{
Comparer: testkeys.Comparer,
FS: vfs.NewMem(),
Expand Down Expand Up @@ -704,6 +716,7 @@ func TestIndexedBatchMutation(t *testing.T) {
}

func TestIndexedBatch_GlobalVisibility(t *testing.T) {
defer leaktest.AfterTest(t)()
opts := &Options{
FS: vfs.NewMem(),
FormatMajorVersion: internalFormatNewest,
Expand Down Expand Up @@ -762,6 +775,7 @@ func TestIndexedBatch_GlobalVisibility(t *testing.T) {
}

func TestFlushableBatchReset(t *testing.T) {
defer leaktest.AfterTest(t)()
var b Batch
var err error
b.flushable, err = newFlushableBatch(&b, DefaultComparer)
Expand All @@ -772,6 +786,7 @@ func TestFlushableBatchReset(t *testing.T) {
}

func TestBatchIncrement(t *testing.T) {
defer leaktest.AfterTest(t)()
testCases := []uint32{
0x00000000,
0x00000001,
Expand Down Expand Up @@ -836,6 +851,7 @@ func TestBatchIncrement(t *testing.T) {
}

func TestBatchOpDoesIncrement(t *testing.T) {
defer leaktest.AfterTest(t)()
var b Batch
key := []byte("foo")
value := []byte("bar")
Expand Down Expand Up @@ -878,6 +894,7 @@ func TestBatchOpDoesIncrement(t *testing.T) {
}

func TestBatchGet(t *testing.T) {
defer leaktest.AfterTest(t)()
testCases := []struct {
method string
memTableSize uint64
Expand Down Expand Up @@ -949,6 +966,7 @@ func TestBatchGet(t *testing.T) {
}

func TestBatchIter(t *testing.T) {
defer leaktest.AfterTest(t)()
var b *Batch

for _, method := range []string{"build", "apply"} {
Expand Down Expand Up @@ -1014,6 +1032,7 @@ func TestBatchIter(t *testing.T) {
}

func TestBatchRangeOps(t *testing.T) {
defer leaktest.AfterTest(t)()
var b *Batch

datadriven.RunTest(t, "testdata/batch_range_ops", func(t *testing.T, td *datadriven.TestData) string {
Expand Down Expand Up @@ -1092,6 +1111,7 @@ func TestBatchRangeOps(t *testing.T) {
}

func TestBatchTooLarge(t *testing.T) {
defer leaktest.AfterTest(t)()
var b Batch
var result interface{}
func() {
Expand All @@ -1106,6 +1126,7 @@ func TestBatchTooLarge(t *testing.T) {
}

func TestFlushableBatchIter(t *testing.T) {
defer leaktest.AfterTest(t)()
var b *flushableBatch
datadriven.RunTest(t, "testdata/internal_iter_next", func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
Expand Down Expand Up @@ -1134,6 +1155,7 @@ func TestFlushableBatchIter(t *testing.T) {
}

func TestFlushableBatch(t *testing.T) {
defer leaktest.AfterTest(t)()
var b *flushableBatch
datadriven.RunTest(t, "testdata/flushable_batch", func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
Expand Down Expand Up @@ -1222,6 +1244,7 @@ func TestFlushableBatch(t *testing.T) {
}

func TestFlushableBatchDeleteRange(t *testing.T) {
defer leaktest.AfterTest(t)()
var fb *flushableBatch
var input string

Expand Down Expand Up @@ -1283,6 +1306,7 @@ func scanKeyspanIterator(w io.Writer, ki keyspan.FragmentIterator) {
}

func TestEmptyFlushableBatch(t *testing.T) {
defer leaktest.AfterTest(t)()
// Verify that we can create a flushable batch on an empty batch.
fb, err := newFlushableBatch(newBatch(nil), DefaultComparer)
require.NoError(t, err)
Expand All @@ -1291,6 +1315,7 @@ func TestEmptyFlushableBatch(t *testing.T) {
}

func TestBatchCommitStats(t *testing.T) {
defer leaktest.AfterTest(t)()
testFunc := func() error {
db, err := Open("", &Options{
FS: vfs.NewMem(),
Expand Down Expand Up @@ -1436,6 +1461,7 @@ func TestBatchCommitStats(t *testing.T) {
// TestBatchLogDataMemtableSize tests that LogDatas never contribute to memtable
// size.
func TestBatchLogDataMemtableSize(t *testing.T) {
defer leaktest.AfterTest(t)()
// Create a batch with Set("foo", "bar") and a LogData. Only the Set should
// contribute to the batch's memtable size.
b := Batch{}
Expand Down Expand Up @@ -1581,6 +1607,7 @@ func BenchmarkIndexedBatchSetDeferred(b *testing.B) {
}

func TestBatchMemTableSizeOverflow(t *testing.T) {
defer leaktest.AfterTest(t)()
opts := testingRandomized(t, &Options{
FS: vfs.NewMem(),
})
Expand All @@ -1605,6 +1632,7 @@ func TestBatchMemTableSizeOverflow(t *testing.T) {
// TestBatchSpanCaching stress tests the caching of keyspan.Spans for range
// tombstones and range keys.
func TestBatchSpanCaching(t *testing.T) {
defer leaktest.AfterTest(t)()
opts := &Options{
Comparer: testkeys.Comparer,
FS: vfs.NewMem(),
Expand Down Expand Up @@ -1702,6 +1730,7 @@ func TestBatchSpanCaching(t *testing.T) {
}

func TestBatchOption(t *testing.T) {
defer leaktest.AfterTest(t)()
for _, tc := range []struct {
name string
opts []BatchOption
Expand Down
12 changes: 11 additions & 1 deletion blob_rewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/crlib/crstrings"
"github.com/cockroachdb/crlib/testutils/leaktest"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/blobtest"
Expand All @@ -35,6 +36,7 @@ import (
)

func TestBlobRewrite(t *testing.T) {
defer leaktest.AfterTest(t)()
var (
bv blobtest.Values
vs compact.ValueSeparation
Expand All @@ -56,6 +58,7 @@ func TestBlobRewrite(t *testing.T) {
FS: fs,
})
require.NoError(t, err)
defer objStore.Close()

initRawWriter := func() {
if tw != nil {
Expand Down Expand Up @@ -172,13 +175,19 @@ func TestBlobRewrite(t *testing.T) {
}

fileCache := NewFileCache(1, 100)
defer fileCache.Unref()
blockCache := NewCache(1024)
defer blockCache.Unref()
blockCacheHandle := blockCache.NewHandle()
defer blockCacheHandle.Close()
mockFC := fileCache.newHandle(
nil,
blockCacheHandle,
objStore,
&base.LoggerWithNoopTracer{Logger: base.DefaultLogger},
sstable.ReaderOptions{},
func(base.ObjectInfo, error) error { return nil },
)
defer mockFC.Close()
var sstables []*manifest.TableMetadata
for _, sstFileNum := range sstableFileNums {
sst := &manifest.TableMetadata{
Expand Down Expand Up @@ -229,6 +238,7 @@ func TestBlobRewrite(t *testing.T) {
// sstables as extant references. Each blob rewrite may rewrite the original
// blob file, or one of the previous iteration's rewritten blob files.
func TestBlobRewriteRandomized(t *testing.T) {
defer leaktest.AfterTest(t)()
const numKVs = 1000
const blobFileID = 100000
const numRewrites = 10
Expand Down
6 changes: 6 additions & 0 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync"
"testing"

"github.com/cockroachdb/crlib/testutils/leaktest"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/testutils"
Expand Down Expand Up @@ -248,6 +249,7 @@ func testCheckpointImpl(t *testing.T, ddFile string, createOnShared bool) {
}

func TestCopyCheckpointOptions(t *testing.T) {
defer leaktest.AfterTest(t)()
fs := vfs.NewMem()
datadriven.RunTest(t, "testdata/copy_checkpoint_options", func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
Expand Down Expand Up @@ -275,6 +277,7 @@ func TestCopyCheckpointOptions(t *testing.T) {
}

func TestCheckpoint(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Run("shared=false", func(t *testing.T) {
testCheckpointImpl(t, "testdata/checkpoint", false /* createOnShared */)
})
Expand All @@ -287,6 +290,7 @@ func TestCheckpoint(t *testing.T) {
}

func TestCheckpointCompaction(t *testing.T) {
defer leaktest.AfterTest(t)()
fs := vfs.NewMem()
d, err := Open("", &Options{FS: fs, Logger: testutils.Logger{T: t}})
require.NoError(t, err)
Expand Down Expand Up @@ -369,6 +373,7 @@ func TestCheckpointCompaction(t *testing.T) {
}

func TestCheckpointFlushWAL(t *testing.T) {
defer leaktest.AfterTest(t)()
const checkpointPath = "checkpoints/checkpoint"
fs := vfs.NewCrashableMem()
opts := &Options{FS: fs, Logger: testutils.Logger{T: t}}
Expand Down Expand Up @@ -422,6 +427,7 @@ func TestCheckpointFlushWAL(t *testing.T) {
}

func TestCheckpointManyFiles(t *testing.T) {
defer leaktest.AfterTest(t)()
if testing.Short() {
t.Skip("skipping because of short flag")
}
Expand Down
7 changes: 7 additions & 0 deletions commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/crlib/testutils/leaktest"
"github.com/cockroachdb/pebble/internal/arenaskl"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/buildtags"
Expand Down Expand Up @@ -61,6 +62,7 @@ func (e *testCommitEnv) write(b *Batch, wg *sync.WaitGroup, _ *error) (*memTable
}

func TestCommitQueue(t *testing.T) {
defer leaktest.AfterTest(t)()
var q commitQueue
var batches [16]Batch
for i := range batches {
Expand All @@ -85,6 +87,7 @@ func TestCommitQueue(t *testing.T) {
}

func TestCommitPipeline(t *testing.T) {
defer leaktest.AfterTest(t)()
var e testCommitEnv
p := newCommitPipeline(e.env())

Expand Down Expand Up @@ -125,6 +128,7 @@ func TestCommitPipeline(t *testing.T) {
}

func TestCommitPipelineSync(t *testing.T) {
defer leaktest.AfterTest(t)()
n := 10000
if invariants.RaceEnabled {
// Under race builds we have to limit the concurrency or we hit the
Expand Down Expand Up @@ -172,6 +176,7 @@ func TestCommitPipelineSync(t *testing.T) {
}

func TestCommitPipelineAllocateSeqNum(t *testing.T) {
defer leaktest.AfterTest(t)()
var e testCommitEnv
p := newCommitPipeline(e.env())

Expand Down Expand Up @@ -220,6 +225,7 @@ func (f *syncDelayFile) Sync() error {
}

func TestCommitPipelineWALClose(t *testing.T) {
defer leaktest.AfterTest(t)()
// This test stresses the edge case of N goroutines blocked in the
// commitPipeline waiting for the log to sync when we concurrently decide to
// rotate and close the log.
Expand Down Expand Up @@ -302,6 +308,7 @@ func TestCommitPipelineWALClose(t *testing.T) {
// also grabbed the first seqnum = 5 before this batch, will ratchet to 5 + 0,
// which is a noop.
func TestCommitPipelineLogDataSeqNum(t *testing.T) {
defer leaktest.AfterTest(t)()
var testEnv commitEnv
testEnv = commitEnv{
logSeqNum: new(base.AtomicSeqNum),
Expand Down
Loading
Loading