diff --git a/batch_test.go b/batch_test.go index dd7b9566fa9..4abfd289e4a 100644 --- a/batch_test.go +++ b/batch_test.go @@ -17,6 +17,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/batchrepr" @@ -32,6 +33,7 @@ import ( ) func TestBatch(t *testing.T) { + defer leaktest.AfterTest(t)() testBatch(t, 0) testBatch(t, defaultBatchInitialSize) } @@ -202,6 +204,7 @@ func testBatch(t *testing.T, size int) { } func TestBatchPreAlloc(t *testing.T) { + defer leaktest.AfterTest(t)() var cases = []struct { size int exp int @@ -220,6 +223,7 @@ func TestBatchPreAlloc(t *testing.T) { } func TestBatchIngestSST(t *testing.T) { + defer leaktest.AfterTest(t)() // Verify that Batch.IngestSST has the correct batch count and memtable // size. var b Batch @@ -232,6 +236,7 @@ func TestBatchIngestSST(t *testing.T) { } func TestBatchLen(t *testing.T) { + defer leaktest.AfterTest(t)() var b Batch requireLenAndReprEq := func(size int) { @@ -256,6 +261,7 @@ func TestBatchLen(t *testing.T) { } func TestBatchEmpty(t *testing.T) { + defer leaktest.AfterTest(t)() testBatchEmpty(t, 0) testBatchEmpty(t, defaultBatchInitialSize) testBatchEmpty(t, 0, WithInitialSizeBytes(2<<10), WithMaxRetainedSizeBytes(2<<20)) @@ -323,6 +329,7 @@ func testBatchEmpty(t *testing.T, size int, opts ...BatchOption) { } func TestBatchApplyNoSyncWait(t *testing.T) { + defer leaktest.AfterTest(t)() db, err := Open("", &Options{ FS: vfs.NewMem(), Logger: testutils.Logger{T: t}, @@ -350,6 +357,7 @@ func TestBatchApplyNoSyncWait(t *testing.T) { } func TestBatchReset(t *testing.T) { + defer leaktest.AfterTest(t)() db, err := Open("", &Options{ FS: vfs.NewMem(), Logger: testutils.Logger{T: t}, @@ -422,11 +430,13 @@ func TestBatchReset(t *testing.T) { } func TestBatchReuse(t *testing.T) { + defer leaktest.AfterTest(t)() db, err := Open("", &Options{ FS: vfs.NewMem(), Logger: testutils.Logger{T: t}, }) require.NoError(t, err) + defer db.Close() var buf bytes.Buffer batches := map[string]*Batch{} @@ -504,6 +514,7 @@ func TestBatchReuse(t *testing.T) { } func TestIndexedBatchReset(t *testing.T) { + defer leaktest.AfterTest(t)() indexCount := func(sl *batchskl.Skiplist) int { count := 0 iter := sl.NewIter(nil, nil) @@ -597,6 +608,7 @@ func TestIndexedBatchReset(t *testing.T) { // TestIndexedBatchMutation tests mutating an indexed batch with an open // iterator. func TestIndexedBatchMutation(t *testing.T) { + defer leaktest.AfterTest(t)() opts := &Options{ Comparer: testkeys.Comparer, FS: vfs.NewMem(), @@ -704,6 +716,7 @@ func TestIndexedBatchMutation(t *testing.T) { } func TestIndexedBatch_GlobalVisibility(t *testing.T) { + defer leaktest.AfterTest(t)() opts := &Options{ FS: vfs.NewMem(), FormatMajorVersion: internalFormatNewest, @@ -762,6 +775,7 @@ func TestIndexedBatch_GlobalVisibility(t *testing.T) { } func TestFlushableBatchReset(t *testing.T) { + defer leaktest.AfterTest(t)() var b Batch var err error b.flushable, err = newFlushableBatch(&b, DefaultComparer) @@ -772,6 +786,7 @@ func TestFlushableBatchReset(t *testing.T) { } func TestBatchIncrement(t *testing.T) { + defer leaktest.AfterTest(t)() testCases := []uint32{ 0x00000000, 0x00000001, @@ -836,6 +851,7 @@ func TestBatchIncrement(t *testing.T) { } func TestBatchOpDoesIncrement(t *testing.T) { + defer leaktest.AfterTest(t)() var b Batch key := []byte("foo") value := []byte("bar") @@ -878,6 +894,7 @@ func TestBatchOpDoesIncrement(t *testing.T) { } func TestBatchGet(t *testing.T) { + defer leaktest.AfterTest(t)() testCases := []struct { method string memTableSize uint64 @@ -949,6 +966,7 @@ func TestBatchGet(t *testing.T) { } func TestBatchIter(t *testing.T) { + defer leaktest.AfterTest(t)() var b *Batch for _, method := range []string{"build", "apply"} { @@ -1014,6 +1032,7 @@ func TestBatchIter(t *testing.T) { } func TestBatchRangeOps(t *testing.T) { + defer leaktest.AfterTest(t)() var b *Batch datadriven.RunTest(t, "testdata/batch_range_ops", func(t *testing.T, td *datadriven.TestData) string { @@ -1092,6 +1111,7 @@ func TestBatchRangeOps(t *testing.T) { } func TestBatchTooLarge(t *testing.T) { + defer leaktest.AfterTest(t)() var b Batch var result interface{} func() { @@ -1106,6 +1126,7 @@ func TestBatchTooLarge(t *testing.T) { } func TestFlushableBatchIter(t *testing.T) { + defer leaktest.AfterTest(t)() var b *flushableBatch datadriven.RunTest(t, "testdata/internal_iter_next", func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { @@ -1134,6 +1155,7 @@ func TestFlushableBatchIter(t *testing.T) { } func TestFlushableBatch(t *testing.T) { + defer leaktest.AfterTest(t)() var b *flushableBatch datadriven.RunTest(t, "testdata/flushable_batch", func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { @@ -1222,6 +1244,7 @@ func TestFlushableBatch(t *testing.T) { } func TestFlushableBatchDeleteRange(t *testing.T) { + defer leaktest.AfterTest(t)() var fb *flushableBatch var input string @@ -1283,6 +1306,7 @@ func scanKeyspanIterator(w io.Writer, ki keyspan.FragmentIterator) { } func TestEmptyFlushableBatch(t *testing.T) { + defer leaktest.AfterTest(t)() // Verify that we can create a flushable batch on an empty batch. fb, err := newFlushableBatch(newBatch(nil), DefaultComparer) require.NoError(t, err) @@ -1291,6 +1315,7 @@ func TestEmptyFlushableBatch(t *testing.T) { } func TestBatchCommitStats(t *testing.T) { + defer leaktest.AfterTest(t)() testFunc := func() error { db, err := Open("", &Options{ FS: vfs.NewMem(), @@ -1436,6 +1461,7 @@ func TestBatchCommitStats(t *testing.T) { // TestBatchLogDataMemtableSize tests that LogDatas never contribute to memtable // size. func TestBatchLogDataMemtableSize(t *testing.T) { + defer leaktest.AfterTest(t)() // Create a batch with Set("foo", "bar") and a LogData. Only the Set should // contribute to the batch's memtable size. b := Batch{} @@ -1581,6 +1607,7 @@ func BenchmarkIndexedBatchSetDeferred(b *testing.B) { } func TestBatchMemTableSizeOverflow(t *testing.T) { + defer leaktest.AfterTest(t)() opts := testingRandomized(t, &Options{ FS: vfs.NewMem(), }) @@ -1605,6 +1632,7 @@ func TestBatchMemTableSizeOverflow(t *testing.T) { // TestBatchSpanCaching stress tests the caching of keyspan.Spans for range // tombstones and range keys. func TestBatchSpanCaching(t *testing.T) { + defer leaktest.AfterTest(t)() opts := &Options{ Comparer: testkeys.Comparer, FS: vfs.NewMem(), @@ -1702,6 +1730,7 @@ func TestBatchSpanCaching(t *testing.T) { } func TestBatchOption(t *testing.T) { + defer leaktest.AfterTest(t)() for _, tc := range []struct { name string opts []BatchOption diff --git a/blob_rewrite_test.go b/blob_rewrite_test.go index 823c272d4ac..2e9c17ef540 100644 --- a/blob_rewrite_test.go +++ b/blob_rewrite_test.go @@ -16,6 +16,7 @@ import ( "time" "github.com/cockroachdb/crlib/crstrings" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/blobtest" @@ -35,6 +36,7 @@ import ( ) func TestBlobRewrite(t *testing.T) { + defer leaktest.AfterTest(t)() var ( bv blobtest.Values vs compact.ValueSeparation @@ -56,6 +58,7 @@ func TestBlobRewrite(t *testing.T) { FS: fs, }) require.NoError(t, err) + defer objStore.Close() initRawWriter := func() { if tw != nil { @@ -172,13 +175,19 @@ func TestBlobRewrite(t *testing.T) { } fileCache := NewFileCache(1, 100) + defer fileCache.Unref() + blockCache := NewCache(1024) + defer blockCache.Unref() + blockCacheHandle := blockCache.NewHandle() + defer blockCacheHandle.Close() mockFC := fileCache.newHandle( - nil, + blockCacheHandle, objStore, &base.LoggerWithNoopTracer{Logger: base.DefaultLogger}, sstable.ReaderOptions{}, func(base.ObjectInfo, error) error { return nil }, ) + defer mockFC.Close() var sstables []*manifest.TableMetadata for _, sstFileNum := range sstableFileNums { sst := &manifest.TableMetadata{ @@ -229,6 +238,7 @@ func TestBlobRewrite(t *testing.T) { // sstables as extant references. Each blob rewrite may rewrite the original // blob file, or one of the previous iteration's rewritten blob files. func TestBlobRewriteRandomized(t *testing.T) { + defer leaktest.AfterTest(t)() const numKVs = 1000 const blobFileID = 100000 const numRewrites = 10 diff --git a/checkpoint_test.go b/checkpoint_test.go index fc8353f7e76..77ecd0e032e 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -16,6 +16,7 @@ import ( "sync" "testing" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/testutils" @@ -248,6 +249,7 @@ func testCheckpointImpl(t *testing.T, ddFile string, createOnShared bool) { } func TestCopyCheckpointOptions(t *testing.T) { + defer leaktest.AfterTest(t)() fs := vfs.NewMem() datadriven.RunTest(t, "testdata/copy_checkpoint_options", func(t *testing.T, td *datadriven.TestData) string { switch td.Cmd { @@ -275,6 +277,7 @@ func TestCopyCheckpointOptions(t *testing.T) { } func TestCheckpoint(t *testing.T) { + defer leaktest.AfterTest(t)() t.Run("shared=false", func(t *testing.T) { testCheckpointImpl(t, "testdata/checkpoint", false /* createOnShared */) }) @@ -287,6 +290,7 @@ func TestCheckpoint(t *testing.T) { } func TestCheckpointCompaction(t *testing.T) { + defer leaktest.AfterTest(t)() fs := vfs.NewMem() d, err := Open("", &Options{FS: fs, Logger: testutils.Logger{T: t}}) require.NoError(t, err) @@ -369,6 +373,7 @@ func TestCheckpointCompaction(t *testing.T) { } func TestCheckpointFlushWAL(t *testing.T) { + defer leaktest.AfterTest(t)() const checkpointPath = "checkpoints/checkpoint" fs := vfs.NewCrashableMem() opts := &Options{FS: fs, Logger: testutils.Logger{T: t}} @@ -422,6 +427,7 @@ func TestCheckpointFlushWAL(t *testing.T) { } func TestCheckpointManyFiles(t *testing.T) { + defer leaktest.AfterTest(t)() if testing.Short() { t.Skip("skipping because of short flag") } diff --git a/commit_test.go b/commit_test.go index 617a059d8e0..fbd7d578497 100644 --- a/commit_test.go +++ b/commit_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/pebble/internal/arenaskl" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/buildtags" @@ -61,6 +62,7 @@ func (e *testCommitEnv) write(b *Batch, wg *sync.WaitGroup, _ *error) (*memTable } func TestCommitQueue(t *testing.T) { + defer leaktest.AfterTest(t)() var q commitQueue var batches [16]Batch for i := range batches { @@ -85,6 +87,7 @@ func TestCommitQueue(t *testing.T) { } func TestCommitPipeline(t *testing.T) { + defer leaktest.AfterTest(t)() var e testCommitEnv p := newCommitPipeline(e.env()) @@ -125,6 +128,7 @@ func TestCommitPipeline(t *testing.T) { } func TestCommitPipelineSync(t *testing.T) { + defer leaktest.AfterTest(t)() n := 10000 if invariants.RaceEnabled { // Under race builds we have to limit the concurrency or we hit the @@ -172,6 +176,7 @@ func TestCommitPipelineSync(t *testing.T) { } func TestCommitPipelineAllocateSeqNum(t *testing.T) { + defer leaktest.AfterTest(t)() var e testCommitEnv p := newCommitPipeline(e.env()) @@ -220,6 +225,7 @@ func (f *syncDelayFile) Sync() error { } func TestCommitPipelineWALClose(t *testing.T) { + defer leaktest.AfterTest(t)() // This test stresses the edge case of N goroutines blocked in the // commitPipeline waiting for the log to sync when we concurrently decide to // rotate and close the log. @@ -302,6 +308,7 @@ func TestCommitPipelineWALClose(t *testing.T) { // also grabbed the first seqnum = 5 before this batch, will ratchet to 5 + 0, // which is a noop. func TestCommitPipelineLogDataSeqNum(t *testing.T) { + defer leaktest.AfterTest(t)() var testEnv commitEnv testEnv = commitEnv{ logSeqNum: new(base.AtomicSeqNum), diff --git a/compaction_picker_test.go b/compaction_picker_test.go index 43b2d564b55..61ca5871af5 100644 --- a/compaction_picker_test.go +++ b/compaction_picker_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/cockroachdb/crlib/crstrings" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" @@ -254,6 +255,7 @@ func parseCompactionLines( } func TestCompactionPickerByScoreLevelMaxBytes(t *testing.T) { + defer leaktest.AfterTest(t)() datadriven.RunTest(t, "testdata/compaction_picker_level_max_bytes", func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { @@ -276,6 +278,7 @@ func TestCompactionPickerByScoreLevelMaxBytes(t *testing.T) { } func TestCompactionPickerTargetLevel(t *testing.T) { + defer leaktest.AfterTest(t)() var vers *manifest.Version var latest *latestVersionState var opts *Options @@ -528,6 +531,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) { } func TestCompactionPickerEstimatedCompactionDebt(t *testing.T) { + defer leaktest.AfterTest(t)() datadriven.RunTest(t, "testdata/compaction_picker_estimated_debt", func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { @@ -548,6 +552,7 @@ func TestCompactionPickerEstimatedCompactionDebt(t *testing.T) { } func TestCompactionPickerL0(t *testing.T) { + defer leaktest.AfterTest(t)() opts := DefaultOptions() opts.Experimental.L0CompactionConcurrency = 1 @@ -714,6 +719,7 @@ func TestCompactionPickerL0(t *testing.T) { } func TestCompactionPickerConcurrency(t *testing.T) { + defer leaktest.AfterTest(t)() opts := DefaultOptions() opts.Experimental.L0CompactionConcurrency = 1 lowerConcurrencyLimit, upperConcurrencyLimit := 1, 4 @@ -824,6 +830,7 @@ func TestCompactionPickerConcurrency(t *testing.T) { } func TestCompactionPickerPickReadTriggered(t *testing.T) { + defer leaktest.AfterTest(t)() opts := DefaultOptions() var picker *compactionPickerByScore var rcList readCompactionQueue @@ -959,6 +966,7 @@ func (d alwaysMultiLevel) allowL0() bool { return false } func (d alwaysMultiLevel) String() string { return "always" } func TestPickedCompactionSetupInputs(t *testing.T) { + defer leaktest.AfterTest(t)() opts := DefaultOptions() setupInputTest := func(t *testing.T, d *datadriven.TestData) string { @@ -1132,6 +1140,7 @@ func TestPickedCompactionSetupInputs(t *testing.T) { } func TestPickedCompactionExpandInputs(t *testing.T) { + defer leaktest.AfterTest(t)() opts := DefaultOptions() cmp := DefaultComparer.Compare var files []*manifest.TableMetadata @@ -1210,6 +1219,7 @@ func TestPickedCompactionExpandInputs(t *testing.T) { } func TestCompactionOutputFileSize(t *testing.T) { + defer leaktest.AfterTest(t)() opts := DefaultOptions() var picker *compactionPickerByScore @@ -1278,6 +1288,7 @@ func TestCompactionOutputFileSize(t *testing.T) { } func TestCompactionPickerCompensatedSize(t *testing.T) { + defer leaktest.AfterTest(t)() testCases := []struct { size uint64 pointDelEstimateBytes uint64 @@ -1319,6 +1330,7 @@ func TestCompactionPickerCompensatedSize(t *testing.T) { } func TestCompactionPickerPickFile(t *testing.T) { + defer leaktest.AfterTest(t)() var problemSpans *problemspans.ByLevel var d *DB defer func() { @@ -1467,6 +1479,7 @@ func (c *pausableCleaner) resume() { } func TestCompactionPickerScores(t *testing.T) { + defer leaktest.AfterTest(t)() cleaner := newPausableCleaner(DeleteCleaner{}) var d *DB diff --git a/compaction_scheduler_test.go b/compaction_scheduler_test.go index 1da7dec8a4f..f1f1fd02683 100644 --- a/compaction_scheduler_test.go +++ b/compaction_scheduler_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" ) @@ -89,6 +90,7 @@ func (d *testDBForCompaction) compactionDone(index int) { } func TestConcurrencyLimitScheduler(t *testing.T) { + defer leaktest.AfterTest(t)() var tts testTimeSource var sched *ConcurrencyLimitScheduler var db *testDBForCompaction diff --git a/compaction_test.go b/compaction_test.go index 11b707a7863..727d8149cb0 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -3027,8 +3027,10 @@ var _ errorfs.Injector = &createManifestErrorInjector{} // // Regression test for #1669. func TestCompaction_UpdateVersionFails(t *testing.T) { - // TODO(jackson): Fix the leak. - // defer leaktest.AfterTest(t)() + defer leaktest.AfterTest(t)() + + stopCh := make(chan struct{}) + defer close(stopCh) // flushKeys writes the given keys to the DB, flushing the resulting memtable. var key = []byte("foo") @@ -3040,7 +3042,16 @@ func TestCompaction_UpdateVersionFails(t *testing.T) { err = b.Commit(nil) require.NoError(t, err) // An error from a failing flush is returned asynchronously. - go func() { _ = db.Flush() }() + go func() { + flushDone, err := db.AsyncFlush() + if err != nil { + panic(err) + } + select { + case <-flushDone: + case <-stopCh: + } + }() return <-flushErrC } diff --git a/db_test.go b/db_test.go index b2d086dc5e7..e9572055bb2 100644 --- a/db_test.go +++ b/db_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/cockroachdb/crlib/fifo" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" @@ -52,6 +53,7 @@ func try(initialSleep, maxTotalSleep time.Duration, f func() error) error { } func TestTry(t *testing.T) { + defer leaktest.AfterTest(t)() c := make(chan struct{}) go func() { time.Sleep(1 * time.Millisecond) @@ -85,6 +87,7 @@ func TestTry(t *testing.T) { } func TestBasicReads(t *testing.T) { + defer leaktest.AfterTest(t)() testCases := []struct { dirname string wantMap map[string]string @@ -168,6 +171,7 @@ func TestBasicReads(t *testing.T) { } func TestBasicWrites(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), })) @@ -306,6 +310,7 @@ func TestBasicWrites(t *testing.T) { } func TestRandomWrites(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), MemTableSize: 8 * 1024, @@ -359,6 +364,7 @@ func TestRandomWrites(t *testing.T) { } func TestLargeBatch(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), MemTableSize: 1400, @@ -445,6 +451,7 @@ func TestLargeBatch(t *testing.T) { } func TestGetNoCache(t *testing.T) { + defer leaktest.AfterTest(t)() cache := NewCache(0) defer cache.Unref() @@ -462,6 +469,7 @@ func TestGetNoCache(t *testing.T) { } func TestGetMerge(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), })) @@ -493,6 +501,7 @@ func TestGetMerge(t *testing.T) { } func TestMergeOrderSameAfterFlush(t *testing.T) { + defer leaktest.AfterTest(t)() // Ensure compaction iterator (used by flush) and user iterator process merge // operands in the same order d, err := Open("", testingRandomized(t, &Options{ @@ -553,6 +562,7 @@ func (m *closableMerger) Close() error { } func TestMergerClosing(t *testing.T) { + defer leaktest.AfterTest(t)() m := &closableMerger{} d, err := Open("", testingRandomized(t, &Options{ @@ -583,6 +593,7 @@ func TestMergerClosing(t *testing.T) { } func TestLogData(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), })) @@ -599,6 +610,7 @@ func TestLogData(t *testing.T) { } func TestSingleDeleteGet(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), })) @@ -624,6 +636,7 @@ func TestSingleDeleteGet(t *testing.T) { } func TestSingleDeleteFlush(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), })) @@ -657,6 +670,7 @@ func TestSingleDeleteFlush(t *testing.T) { } func TestUnremovableSingleDelete(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), L0CompactionThreshold: 8, @@ -692,6 +706,7 @@ func TestUnremovableSingleDelete(t *testing.T) { } func TestIterLeak(t *testing.T) { + defer leaktest.AfterTest(t)() for _, leak := range []bool{true, false} { t.Run(fmt.Sprintf("leak=%t", leak), func(t *testing.T) { for _, flush := range []bool{true, false} { @@ -732,6 +747,10 @@ func TestIterLeak(t *testing.T) { // Make sure that we detect an iter leak when only one DB closes // while the second db still holds a reference to the FileCache. func TestIterLeakSharedCache(t *testing.T) { + // TODO(radu): this test leaks a file cache goroutine. The goroutine is + // blocked and won't span other goroutines so this won't affect leaked + // goroutine detection in other tests. + // defer leaktest.AfterTest(t)() for _, leak := range []bool{true, false} { t.Run(fmt.Sprintf("leak=%t", leak), func(t *testing.T) { for _, flush := range []bool{true, false} { @@ -821,6 +840,7 @@ func TestIterLeakSharedCache(t *testing.T) { } func TestMemTableReservation(t *testing.T) { + defer leaktest.AfterTest(t)() opts := &Options{ Cache: NewCache(128 << 10 /* 128 KB */), // We're going to be looking at and asserting the global memtable reservation @@ -895,6 +915,7 @@ func TestMemTableReservation(t *testing.T) { } func TestMemTableReservationLeak(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{FS: vfs.NewMem()})) require.NoError(t, err) @@ -915,6 +936,7 @@ func TestMemTableReservationLeak(t *testing.T) { } func TestCacheEvict(t *testing.T) { + defer leaktest.AfterTest(t)() cache := NewCache(10 << 20) defer cache.Unref() @@ -954,6 +976,7 @@ func TestCacheEvict(t *testing.T) { } func TestFlushEmpty(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), })) @@ -965,6 +988,7 @@ func TestFlushEmpty(t *testing.T) { } func TestRollManifest(t *testing.T) { + defer leaktest.AfterTest(t)() toPreserve := rand.Int32N(5) + 1 opts := &Options{ MaxManifestFileSize: 1, @@ -1112,6 +1136,7 @@ func TestRollManifest(t *testing.T) { } func TestDBClosed(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", &Options{ FS: vfs.NewMem(), }) @@ -1152,6 +1177,7 @@ func TestDBClosed(t *testing.T) { } func TestDBConcurrentCommitCompactFlush(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), })) @@ -1184,6 +1210,7 @@ func TestDBConcurrentCommitCompactFlush(t *testing.T) { } func TestDBConcurrentCompactClose(t *testing.T) { + defer leaktest.AfterTest(t)() // Test closing while a compaction is ongoing. This ensures compaction code // detects the close and finishes cleanly. mem := vfs.NewMem() @@ -1217,6 +1244,7 @@ func TestDBConcurrentCompactClose(t *testing.T) { } func TestDBApplyBatchNilDB(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{FS: vfs.NewMem()})) require.NoError(t, err) @@ -1237,6 +1265,7 @@ func TestDBApplyBatchNilDB(t *testing.T) { } func TestDBApplyBatchMismatch(t *testing.T) { + defer leaktest.AfterTest(t)() srcDB, err := Open("", testingRandomized(t, &Options{FS: vfs.NewMem()})) require.NoError(t, err) @@ -1263,6 +1292,7 @@ func TestDBApplyBatchMismatch(t *testing.T) { } func TestCloseCleanerRace(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() for i := 0; i < 20; i++ { db, err := Open("", testingRandomized(t, &Options{FS: mem})) @@ -1300,6 +1330,7 @@ func TestCloseCleanerRace(t *testing.T) { } func TestSSTablesWithApproximateSpanBytes(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), })) @@ -1341,6 +1372,7 @@ func TestSSTablesWithApproximateSpanBytes(t *testing.T) { } func TestFilterSSTablesWithOption(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), })) @@ -1381,6 +1413,7 @@ func TestFilterSSTablesWithOption(t *testing.T) { } func TestSSTables(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), })) @@ -1418,6 +1451,7 @@ func TestSSTables(t *testing.T) { } func TestVirtualSSTables(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", testingRandomized(t, &Options{ FS: vfs.NewMem(), FormatMajorVersion: FormatTableFormatV6, @@ -1496,6 +1530,7 @@ func (t *testTracer) IsTracingEnabled(ctx context.Context) bool { } func TestTracing(t *testing.T) { + defer leaktest.AfterTest(t)() defer base.DeterministicReadDurationForTesting()() var tracer testTracer @@ -1570,6 +1605,7 @@ func TestTracing(t *testing.T) { } func TestMemtableIngestInversion(t *testing.T) { + defer leaktest.AfterTest(t)() memFS := vfs.NewMem() opts := &Options{ FS: memFS, @@ -2104,6 +2140,7 @@ func BenchmarkRotateMemtables(b *testing.B) { // TODO(sumeer): rewrite test when LogRecycler is hidden from this package. func TestRecycleLogs(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() d, err := Open("", &Options{ FS: mem, @@ -2192,6 +2229,7 @@ func newBlockingFS(fs vfs.FS, blockWAL, blockSST bool) *sstAndLogFileBlockingFS } func TestWALFailoverAvoidsWriteStall(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() // All sst and log creation is blocked. primaryFS := newBlockingFS(mem, true /*blockWAL*/, true /*blockSST*/) @@ -2238,6 +2276,7 @@ func (tlm *testLogManager) ElevateWriteStallThresholdForFailover() bool { } func TestElevateThresholdAfterWriteStallUnblocksStall(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() // All sst writes are blocked. blockingFS := newBlockingFS(mem, false /*blockWAL*/, true /*blockSST*/) @@ -2301,6 +2340,7 @@ func TestElevateThresholdAfterWriteStallUnblocksStall(t *testing.T) { // re-run the sequence introducing latencies, reorderings, parallelism, etc, // ensuring that all re-runs produce the same output. func TestDeterminism(t *testing.T) { + defer leaktest.AfterTest(t)() var d *DB var fs vfs.FS = vfs.NewMem() defer func() { @@ -2345,6 +2385,9 @@ func TestDeterminism(t *testing.T) { DisableAutomaticCompactions: true, } opts.Experimental.IngestSplit = func() bool { return rand.IntN(2) == 1 } + if d != nil { + require.NoError(t, d.Close()) + } var err error if d, err = runDBDefineCmd(td, opts); err != nil { return err.Error() @@ -2591,6 +2634,7 @@ func (f *readTrackFile) ReadAt(p []byte, off int64) (n int, err error) { } func TestLoadBlockSema(t *testing.T) { + defer leaktest.AfterTest(t)() fs := &readTrackFS{FS: vfs.NewMem()} sema := fifo.NewSemaphore(100) db, err := Open("", testingRandomized(t, &Options{ @@ -2600,6 +2644,7 @@ func TestLoadBlockSema(t *testing.T) { Logger: testutils.Logger{T: t}, })) require.NoError(t, err) + defer db.Close() key := func(i, j int) []byte { return []byte(fmt.Sprintf("%02d/%02d", i, j)) diff --git a/disk_usage_test.go b/disk_usage_test.go index baebaf778fc..2dfcdbdb68c 100644 --- a/disk_usage_test.go +++ b/disk_usage_test.go @@ -8,6 +8,7 @@ import ( "fmt" "testing" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/objstorage/remote" "github.com/cockroachdb/pebble/vfs" @@ -16,6 +17,7 @@ import ( // Test that the EstimateDiskUsage and EstimateDiskUsageByBackingType should panic when the DB is closed func TestEstimateDiskUsageClosedDB(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() d, err := Open("", &Options{FS: mem}) require.NoError(t, err) @@ -32,6 +34,7 @@ func TestEstimateDiskUsageClosedDB(t *testing.T) { // Test the EstimateDiskUsage and EstimateDiskUsageByBackingType data driven tests func TestEstimateDiskUsageDataDriven(t *testing.T) { + defer leaktest.AfterTest(t)() fs := vfs.NewMem() remoteStorage := remote.NewInMem() var d *DB diff --git a/download_test.go b/download_test.go index a08edc4e209..2a963722cce 100644 --- a/download_test.go +++ b/download_test.go @@ -12,6 +12,7 @@ import ( "strings" "testing" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/manifest" @@ -99,6 +100,7 @@ func TestDownloadCursor(t *testing.T) { } func TestDownloadTask(t *testing.T) { + defer leaktest.AfterTest(t)() cmp := bytes.Compare objProvider := initDownloadTestProvider(t) d := DB{ diff --git a/error_test.go b/error_test.go index e4ba9f62266..e6d17d57ce4 100644 --- a/error_test.go +++ b/error_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/testkeys" @@ -101,6 +102,7 @@ func expectLSM(expected string, d *DB, t *testing.T) { // TestErrors repeatedly runs a short sequence of operations, injecting FS // errors at different points, until success is achieved. func TestErrors(t *testing.T) { + defer leaktest.AfterTest(t)() run := func(fs *errorfs.FS) (err error) { defer func() { if r := recover(); r != nil { @@ -119,6 +121,11 @@ func TestErrors(t *testing.T) { if err != nil { return err } + defer func() { + if d != nil { + d.Close() + } + }() key := []byte("a") value := []byte("b") @@ -138,7 +145,9 @@ func TestErrors(t *testing.T) { if err := iter.Close(); err != nil { return err } - return d.Close() + err = d.Close() + d = nil + return err } errorCounts := make(map[string]int) @@ -174,6 +183,7 @@ func TestErrors(t *testing.T) { // cannot require operations fail since it involves flush/compaction, which retry // internally and succeed following an injected error. func TestRequireReadError(t *testing.T) { + defer leaktest.AfterTest(t)() run := func(formatVersion FormatMajorVersion, index int32) (err error) { // Perform setup with error injection disabled as it involves writes/background ops. ii := errorfs.OnIndex(-1) @@ -265,6 +275,7 @@ L6: // corruption and return an error. In this case the filesystem reads return // successful status but the data they return is corrupt. func TestCorruptReadError(t *testing.T) { + defer leaktest.AfterTest(t)() run := func(formatVersion FormatMajorVersion, index int32) (err error) { // Perform setup with corruption injection disabled as it involves writes/background ops. fs := &corruptFS{ @@ -355,6 +366,7 @@ L6: } func TestDBWALRotationCrash(t *testing.T) { + defer leaktest.AfterTest(t)() memfs := vfs.NewCrashableMem() var crashFS *vfs.MemFS @@ -419,6 +431,7 @@ func TestDBWALRotationCrash(t *testing.T) { } func TestDBCompactionCrash(t *testing.T) { + defer leaktest.AfterTest(t)() seed := time.Now().UnixNano() t.Log("seed", seed) diff --git a/event_listener_test.go b/event_listener_test.go index cfb80ff173c..42fb8a42585 100644 --- a/event_listener_test.go +++ b/event_listener_test.go @@ -18,6 +18,7 @@ import ( "time" "github.com/cockroachdb/crlib/crtime" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" @@ -31,6 +32,7 @@ import ( // Verify event listener actions, as well as expected filesystem operations. func TestEventListener(t *testing.T) { + defer leaktest.AfterTest(t)() if runtime.GOARCH == "386" { t.Skip("skipped on 32-bit due to slightly varied output") } @@ -246,6 +248,7 @@ func TestEventListener(t *testing.T) { } func TestWriteStallEvents(t *testing.T) { + defer leaktest.AfterTest(t)() const flushCount = 10 const writeStallEnd = "write stall ending" @@ -345,6 +348,7 @@ func (l redactLogger) Fatalf(format string, args ...interface{}) { } func TestEventListenerRedact(t *testing.T) { + defer leaktest.AfterTest(t)() // The vast majority of event listener fields logged are safe and do not // need to be redacted. Verify that the rare, unsafe error does appear in // the log redacted. @@ -359,12 +363,14 @@ func TestEventListenerRedact(t *testing.T) { } func TestEventListenerEnsureDefaultsSetsAllCallbacks(t *testing.T) { + defer leaktest.AfterTest(t)() e := EventListener{} e.EnsureDefaults(nil) testAllCallbacksSetInEventListener(t, e) } func TestMakeLoggingEventListenerSetsAllCallbacks(t *testing.T) { + defer leaktest.AfterTest(t)() e := MakeLoggingEventListener(nil) testAllCallbacksSetInEventListener(t, e) } @@ -409,6 +415,7 @@ func newCountingMockLogger(t *testing.T) (*mockLogger, *int, *int) { } func TestMakeLoggingEventListenerBackgroundErrorOtherError(t *testing.T) { + defer leaktest.AfterTest(t)() mockLogger, infoCount, errorCount := newCountingMockLogger(t) e := MakeLoggingEventListener(mockLogger) @@ -420,6 +427,7 @@ func TestMakeLoggingEventListenerBackgroundErrorOtherError(t *testing.T) { } func TestTeeEventListenerSetsAllCallbacks(t *testing.T) { + defer leaktest.AfterTest(t)() e := TeeEventListener(EventListener{}, EventListener{}) testAllCallbacksSetInEventListener(t, e) } @@ -436,6 +444,7 @@ func testAllCallbacksSetInEventListener(t *testing.T, e EventListener) { } func TestLowDiskReporter(t *testing.T) { + defer leaktest.AfterTest(t)() const totalBytes = 1000 testCases := []struct { // time, as a fraction of lowDiskSpaceFrequency. @@ -480,6 +489,7 @@ func TestLowDiskReporter(t *testing.T) { } func TestLowDiskSpaceEvent(t *testing.T) { + defer leaktest.AfterTest(t)() var lastInfo atomic.Value listener := &EventListener{ @@ -535,6 +545,7 @@ func (fs *mockDiskUsageFS) GetDiskUsage(path string) (vfs.DiskUsage, error) { } func TestSSTCorruptionEvent(t *testing.T) { + defer leaktest.AfterTest(t)() for _, test := range []string{"missing-file", "missing-before-open", "meta-block-corruption", "data-block-corruption"} { t.Run(test, func(t *testing.T) { var mu sync.Mutex @@ -613,6 +624,7 @@ func TestSSTCorruptionEvent(t *testing.T) { } func TestBlobCorruptionEvent(t *testing.T) { + defer leaktest.AfterTest(t)() for _, test := range []string{"missing-file"} { t.Run(test, func(t *testing.T) { var mu sync.Mutex diff --git a/excise_test.go b/excise_test.go index 0523589813c..5f260cb5218 100644 --- a/excise_test.go +++ b/excise_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" @@ -28,6 +29,7 @@ import ( ) func TestExcise(t *testing.T) { + defer leaktest.AfterTest(t)() var mem vfs.FS var d *DB var flushed bool @@ -355,6 +357,7 @@ func TestExcise(t *testing.T) { } func TestConcurrentExcise(t *testing.T) { + defer leaktest.AfterTest(t)() var d, d1, d2 *DB var efos map[string]*EventuallyFileOnlySnapshot compactionErrs := make(chan error, 5) @@ -780,6 +783,7 @@ func TestConcurrentExcise(t *testing.T) { } func TestExciseBounds(t *testing.T) { + defer leaktest.AfterTest(t)() const sstPath = "foo.sst" var fs vfs.FS var m *manifest.TableMetadata diff --git a/external_iterator_test.go b/external_iterator_test.go index 59e91572a3c..4783d3e8ebc 100644 --- a/external_iterator_test.go +++ b/external_iterator_test.go @@ -11,6 +11,7 @@ import ( "slices" "testing" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/blobtest" @@ -22,6 +23,7 @@ import ( ) func TestExternalIterator(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() o := &Options{ FS: mem, diff --git a/external_test.go b/external_test.go index 47f5cd09ab9..1ba21013884 100644 --- a/external_test.go +++ b/external_test.go @@ -16,6 +16,7 @@ import ( "testing/quick" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/bloom" "github.com/cockroachdb/pebble/cockroachkvs" @@ -36,6 +37,8 @@ import ( // expects the error to surface to the operation output. If it doesn't, the test // fails. func TestIteratorErrors(t *testing.T) { + // TODO(radu): fix goroutine leak. + //defer leaktest.AfterTest(t)() seed := time.Now().UnixNano() t.Logf("Using seed %d", seed) rng := rand.New(rand.NewPCG(0, uint64(seed))) @@ -265,6 +268,7 @@ func buildSeparatedValuesDB( // This test was used to reproduce the failure in // https://github.com/cockroachdb/cockroach/issues/148419. func TestDoubleRestart(t *testing.T) { + defer leaktest.AfterTest(t)() seed := time.Now().UnixNano() t.Logf("Using seed %d", seed) rng := rand.New(rand.NewPCG(0, uint64(seed))) @@ -412,6 +416,7 @@ func checkKVs( } func TestOptionsClone(t *testing.T) { + defer leaktest.AfterTest(t)() seed := time.Now().UnixNano() t.Logf("Using seed %d", seed) rng := rand.New(rand.NewPCG(0, uint64(seed))) diff --git a/file_cache_test.go b/file_cache_test.go index 3209f3e94ae..d21c766d00a 100644 --- a/file_cache_test.go +++ b/file_cache_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" @@ -258,6 +259,7 @@ func (t *fileCacheTest) newTestHandle() (*fileCacheHandle, *fileCacheTestFS) { // Test basic reference counting for the file cache. func TestFileCacheRefs(t *testing.T) { + defer leaktest.AfterTest(t)() fct := newFileCacheTest(t, 8<<20, 10, 2) // We don't call the full fct.cleanup() method because we will unref the // fileCache in the test. @@ -305,6 +307,7 @@ func TestFileCacheRefs(t *testing.T) { // Basic test to determine if reads through the file cache are wired correctly. func TestVirtualReadsWiring(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", &Options{ FS: vfs.NewMem(), @@ -458,6 +461,7 @@ func TestVirtualReadsWiring(t *testing.T) { // The file cache shouldn't be usable after all the dbs close. func TestSharedFileCacheUseAfterAllFree(t *testing.T) { + defer leaktest.AfterTest(t)() fct := newFileCacheTest(t, 8<<20, 10, 1) // We don't call the full fct.cleanup() method because we will unref the // fileCache in the test. @@ -514,6 +518,7 @@ func TestSharedFileCacheUseAfterAllFree(t *testing.T) { // TestSharedFileCacheUseAfterOneFree tests whether a shared file cache is // usable by a db, after one of the db's releases its reference. func TestSharedFileCacheUseAfterOneFree(t *testing.T) { + defer leaktest.AfterTest(t)() fct := newFileCacheTest(t, 8<<20, 10, 1) defer fct.cleanup() @@ -559,6 +564,7 @@ func TestSharedFileCacheUseAfterOneFree(t *testing.T) { // TestSharedFileCacheUsable ensures that a shared file cache is usable by more // than one database at once. func TestSharedFileCacheUsable(t *testing.T) { + defer leaktest.AfterTest(t)() fct := newFileCacheTest(t, 8<<20, 10, 1) defer fct.cleanup() @@ -603,6 +609,7 @@ func TestSharedFileCacheUsable(t *testing.T) { } func TestSharedTableConcurrent(t *testing.T) { + defer leaktest.AfterTest(t)() fct := newFileCacheTest(t, 8<<20, 10, 1) defer fct.cleanup() @@ -715,8 +722,14 @@ func testFileCacheRandomAccess(t *testing.T, concurrent bool) { fs.validateAndCloseHandle(t, h, nil) } -func TestFileCacheRandomAccessSequential(t *testing.T) { testFileCacheRandomAccess(t, false) } -func TestFileCacheRandomAccessConcurrent(t *testing.T) { testFileCacheRandomAccess(t, true) } +func TestFileCacheRandomAccessSequential(t *testing.T) { + defer leaktest.AfterTest(t)() + testFileCacheRandomAccess(t, false) +} +func TestFileCacheRandomAccessConcurrent(t *testing.T) { + defer leaktest.AfterTest(t)() + testFileCacheRandomAccess(t, true) +} func testFileCacheFrequentlyUsedInternal(t *testing.T, rangeIter bool) { const ( @@ -770,6 +783,7 @@ func testFileCacheFrequentlyUsedInternal(t *testing.T, rangeIter bool) { } func TestFileCacheFrequentlyUsed(t *testing.T) { + defer leaktest.AfterTest(t)() for i, iterType := range []string{"point", "range"} { t.Run(fmt.Sprintf("iter=%s", iterType), func(t *testing.T) { testFileCacheFrequentlyUsedInternal(t, i == 1) @@ -778,6 +792,7 @@ func TestFileCacheFrequentlyUsed(t *testing.T) { } func TestSharedFileCacheFrequentlyUsed(t *testing.T) { + defer leaktest.AfterTest(t)() const ( N = 1000 pinned0 = 7 @@ -909,6 +924,7 @@ func testFileCacheEvictionsInternal(t *testing.T, rangeIter bool) { } func TestFileCacheEvictions(t *testing.T) { + defer leaktest.AfterTest(t)() for i, iterType := range []string{"point", "range"} { t.Run(fmt.Sprintf("iter=%s", iterType), func(t *testing.T) { testFileCacheEvictionsInternal(t, i == 1) @@ -917,6 +933,7 @@ func TestFileCacheEvictions(t *testing.T) { } func TestSharedFileCacheEvictions(t *testing.T) { + defer leaktest.AfterTest(t)() const ( N = 1000 lo, hi = 10, 20 @@ -1011,6 +1028,7 @@ func TestSharedFileCacheEvictions(t *testing.T) { } func TestFileCacheIterLeak(t *testing.T) { + defer leaktest.AfterTest(t)() fct := newFileCacheTest(t, 8<<20, fileCacheTestCacheSize, []int{1, 2, 4, 10}[rand.IntN(4)]) defer fct.cleanup() h, _ := fct.newTestHandle() @@ -1033,6 +1051,9 @@ func TestFileCacheIterLeak(t *testing.T) { } func TestSharedFileCacheIterLeak(t *testing.T) { + // TODO(radu): figure out how we could test this without leaking goroutines. + // defer leaktest.AfterTest(t)() + fct := newFileCacheTest(t, 8<<20, fileCacheTestCacheSize, []int{1, 2, 4, 10}[rand.IntN(4)]) // We don't call the full fct.cleanup() method because we will unref the // fileCache in the test. @@ -1073,6 +1094,7 @@ func TestSharedFileCacheIterLeak(t *testing.T) { } func TestFileCacheRetryAfterFailure(t *testing.T) { + defer leaktest.AfterTest(t)() ctx := context.Background() // Test a retry can succeed after a failure, i.e., errors are not cached. fct := newFileCacheTest(t, 8<<20, fileCacheTestCacheSize, []int{1, 2, 4, 10}[rand.IntN(4)]) @@ -1117,6 +1139,7 @@ func TestFileCacheRetryAfterFailure(t *testing.T) { } func TestFileCacheErrorBadMagicNumber(t *testing.T) { + defer leaktest.AfterTest(t)() obj := &objstorage.MemObj{} tw := sstable.NewWriter(obj, sstable.WriterOptions{TableFormat: sstable.TableFormatPebblev2}) tw.Set([]byte("a"), nil) @@ -1158,6 +1181,7 @@ func TestFileCacheErrorBadMagicNumber(t *testing.T) { } func TestFileCacheEvictClose(t *testing.T) { + defer leaktest.AfterTest(t)() errs := make(chan error, 10) db, err := Open("test", &Options{ @@ -1221,6 +1245,7 @@ func BenchmarkNewItersAlloc(b *testing.B) { // TestFileCacheNoSuchFileError verifies that when the file cache hits a "no // such file" error, it generates a useful fatal message. func TestFileCacheNoSuchFileError(t *testing.T) { + defer leaktest.AfterTest(t)() const dirname = "test" mem := vfs.NewMem() logger := &catchFatalLogger{} diff --git a/filenames_test.go b/filenames_test.go index 08f04bbee60..af23f1efdc6 100644 --- a/filenames_test.go +++ b/filenames_test.go @@ -7,6 +7,7 @@ package pebble import ( "testing" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/testutils" @@ -19,6 +20,7 @@ import ( // the filesystem. These temporary files should be cleaned // up on Open. func TestSetCurrentFileCrash(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() // Initialize a fresh database to write the initial MANIFEST. diff --git a/flush_test.go b/flush_test.go index 0031420e9b2..976d7f79a09 100644 --- a/flush_test.go +++ b/flush_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/vfs" @@ -16,6 +17,7 @@ import ( ) func TestManualFlush(t *testing.T) { + defer leaktest.AfterTest(t)() getOptions := func() *Options { opts := &Options{ FS: vfs.NewMem(), @@ -95,6 +97,7 @@ func TestManualFlush(t *testing.T) { // TestFlushDelRangeEmptyKey tests flushing a range tombstone that begins with // an empty key. The empty key is a valid key but can be confused with nil. func TestFlushDelRangeEmptyKey(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", &Options{FS: vfs.NewMem()}) require.NoError(t, err) require.NoError(t, d.DeleteRange([]byte{}, []byte("z"), nil)) @@ -105,6 +108,7 @@ func TestFlushDelRangeEmptyKey(t *testing.T) { // TestFlushEmptyKey tests that flushing an empty key does not trigger that key // order invariant assertions. func TestFlushEmptyKey(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", &Options{FS: vfs.NewMem()}) require.NoError(t, err) require.NoError(t, d.Set(nil, []byte("hello"), nil)) diff --git a/flushable_test.go b/flushable_test.go index abfc9105f56..5e1cb7e9158 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -7,6 +7,7 @@ import ( "strings" "testing" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/manifest" @@ -17,6 +18,7 @@ import ( // Simple sanity tests for the flushable interface implementation for ingested // sstables. func TestIngestedSSTFlushableAPI(t *testing.T) { + defer leaktest.AfterTest(t)() var mem vfs.FS var d *DB defer func() { diff --git a/format_major_version.go b/format_major_version.go index 843c935c44e..c22330e094b 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -411,19 +411,26 @@ func lookupFormatMajorVersion( if versString == "" { return FormatDefault, m, nil } - v, err := strconv.ParseUint(versString, 10, 64) + vers, err := func() (FormatMajorVersion, error) { + v, err := strconv.ParseUint(versString, 10, 64) + if err != nil { + return 0, errors.Wrap(err, "parsing format major version") + } + vers := FormatMajorVersion(v) + if vers == FormatDefault { + return 0, errors.Newf("pebble: default format major version should not persisted", vers) + } + if vers > internalFormatNewest { + return 0, errors.Newf("pebble: database %q written in unknown format major version %d", dirname, vers) + } + if vers < FormatMinSupported { + return 0, errors.Newf("pebble: database %q written in format major version %d which is no longer supported", dirname, vers) + } + return vers, nil + }() if err != nil { - return 0, nil, errors.Wrap(err, "parsing format major version") - } - vers := FormatMajorVersion(v) - if vers == FormatDefault { - return 0, nil, errors.Newf("pebble: default format major version should not persisted", vers) - } - if vers > internalFormatNewest { - return 0, nil, errors.Newf("pebble: database %q written in unknown format major version %d", dirname, vers) - } - if vers < FormatMinSupported { - return 0, nil, errors.Newf("pebble: database %q written in format major version %d which is no longer supported", dirname, vers) + _ = m.Close() + return 0, nil, err } return vers, m, nil } diff --git a/format_major_version_test.go b/format_major_version_test.go index 3ded63fe725..4b02d70f905 100644 --- a/format_major_version_test.go +++ b/format_major_version_test.go @@ -9,6 +9,7 @@ import ( "fmt" "testing" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/pebble/internal/testutils" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/sstable/blob" @@ -20,6 +21,7 @@ import ( // TestFormatMajorVersionValues checks that we don't accidentally change the // numbers of format versions. func TestFormatMajorVersionStableValues(t *testing.T) { + defer leaktest.AfterTest(t)() require.Equal(t, FormatDefault, FormatMajorVersion(0)) require.Equal(t, FormatFlushableIngest, FormatMajorVersion(13)) @@ -46,6 +48,7 @@ func TestFormatMajorVersionStableValues(t *testing.T) { } func TestFormatMajorVersion_MigrationDefined(t *testing.T) { + defer leaktest.AfterTest(t)() for v := FormatMinSupported; v <= FormatNewest; v++ { if _, ok := formatMajorVersionMigrations[v]; !ok { t.Errorf("format major version %d has no migration defined", v) @@ -54,6 +57,8 @@ func TestFormatMajorVersion_MigrationDefined(t *testing.T) { } func TestRatchetFormat(t *testing.T) { + // TODO(radu): fix goroutine leak from diskHealthCheckingFS on failed Open. + // defer leaktest.AfterTest(t)() fs := vfs.NewMem() opts := &Options{FS: fs} opts.WithFSDefaults() @@ -118,6 +123,7 @@ func testBasicDB(d *DB) error { } func TestFormatMajorVersions(t *testing.T) { + defer leaktest.AfterTest(t)() for vers := FormatMinSupported; vers <= FormatNewest; vers++ { t.Run(fmt.Sprintf("vers=%03d", vers), func(t *testing.T) { fs := vfs.NewCrashableMem() @@ -210,6 +216,7 @@ func TestFormatMajorVersions(t *testing.T) { } func TestFormatMajorVersions_TableFormat(t *testing.T) { + defer leaktest.AfterTest(t)() // NB: This test is intended to validate the mapping between every // FormatMajorVersion and sstable.TableFormat exhaustively. This serves as a // sanity check that new versions have a corresponding mapping. The test @@ -249,6 +256,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) { } func TestFormatMajorVersions_BlobFileFormat(t *testing.T) { + defer leaktest.AfterTest(t)() // NB: This test is intended to validate the mapping between every // FormatMajorVersion and blob.FileFormat exhaustively. This serves as a // sanity check that new versions have a corresponding mapping. The test @@ -274,6 +282,7 @@ func TestFormatMajorVersions_BlobFileFormat(t *testing.T) { } func TestFormatMajorVersions_MaxTableFormat(t *testing.T) { + defer leaktest.AfterTest(t)() type testCase struct { fmv FormatMajorVersion want sstable.TableFormat diff --git a/get_test.go b/get_test.go index 86314b49fb5..bbddd6f2b10 100644 --- a/get_test.go +++ b/get_test.go @@ -9,6 +9,7 @@ import ( "strings" "testing" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/manifest" @@ -17,6 +18,7 @@ import ( ) func TestGetIter(t *testing.T) { + defer leaktest.AfterTest(t)() // testTable is a table to insert into a version. // Each element of data is a string of the form "internalKey value". type testTable struct { diff --git a/ingest_test.go b/ingest_test.go index 301160f60dd..d326ca9a683 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" @@ -48,6 +49,7 @@ import ( ) func TestSSTableKeyCompare(t *testing.T) { + defer leaktest.AfterTest(t)() var buf bytes.Buffer datadriven.RunTest(t, "testdata/sstable_key_compare", func(t *testing.T, td *datadriven.TestData) string { switch td.Cmd { @@ -77,6 +79,8 @@ func TestSSTableKeyCompare(t *testing.T) { } func TestIngestLoad(t *testing.T) { + // TODO(radu): fix goroutine leak from diskHealthCheckingFS on failed Open. + // defer leaktest.AfterTest(t)() mem := vfs.NewMem() keySchema := colblk.DefaultKeySchema(testkeys.Comparer, 16) @@ -169,6 +173,7 @@ func TestIngestLoad(t *testing.T) { } func TestIngestLoadRand(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() rng := rand.New(rand.NewPCG(0, uint64(time.Now().UnixNano()))) cmp := DefaultComparer.Compare @@ -262,6 +267,7 @@ func TestIngestLoadRand(t *testing.T) { } func TestIngestLoadInvalid(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() f, err := mem.Create("invalid", vfs.WriteCategoryUnspecified) require.NoError(t, err) @@ -278,6 +284,7 @@ func TestIngestLoadInvalid(t *testing.T) { } func TestIngestSortAndVerify(t *testing.T) { + defer leaktest.AfterTest(t)() comparers := map[string]Compare{ "default": DefaultComparer.Compare, "reverse": func(a, b []byte) int { @@ -332,6 +339,7 @@ func TestIngestSortAndVerify(t *testing.T) { } func TestIngestLink(t *testing.T) { + defer leaktest.AfterTest(t)() // Test linking of tables into the DB directory. Test cleanup when one of the // tables cannot be linked. @@ -342,6 +350,7 @@ func TestIngestLink(t *testing.T) { opts := &Options{FS: vfs.NewMem()} opts.EnsureDefaults() opts.WithFSDefaults() + defer opts.private.fsCloser.Close() require.NoError(t, opts.FS.MkdirAll(dir, 0755)) objProvider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(opts.FS, dir)) require.NoError(t, err) @@ -419,6 +428,7 @@ func TestIngestLink(t *testing.T) { } func TestIngestLinkFallback(t *testing.T) { + defer leaktest.AfterTest(t)() // Verify that ingestLink succeeds if linking fails by falling back to // copying. mem := vfs.NewMem() @@ -428,6 +438,7 @@ func TestIngestLinkFallback(t *testing.T) { opts := &Options{FS: errorfs.Wrap(mem, errorfs.ErrInjected.If(errorfs.OnIndex(1)))} opts.EnsureDefaults() opts.WithFSDefaults() + defer opts.private.fsCloser.Close() objSettings := objstorageprovider.DefaultSettings(opts.FS, "") // Prevent the provider from listing the dir (where we may get an injected error). objSettings.FSDirInitialListing = []string{} @@ -454,6 +465,7 @@ func TestIngestLinkFallback(t *testing.T) { } func TestOverlappingIngestedSSTs(t *testing.T) { + defer leaktest.AfterTest(t)() dir := "" var ( mem *vfs.MemFS @@ -1000,6 +1012,7 @@ func testIngestSharedImpl( } func TestIngestShared(t *testing.T) { + defer leaktest.AfterTest(t)() for _, strategy := range []remote.CreateOnSharedStrategy{remote.CreateOnSharedAll, remote.CreateOnSharedLower} { strategyStr := "all" if strategy == remote.CreateOnSharedLower { @@ -1016,6 +1029,7 @@ func TestIngestShared(t *testing.T) { } func TestSimpleIngestShared(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() var d *DB var provider2 objstorage.Provider @@ -1125,6 +1139,7 @@ type blockedCompaction struct { } func TestIngestExternal(t *testing.T) { + defer leaktest.AfterTest(t)() var mem vfs.FS var d, d1, d2 *DB var opts *Options @@ -1411,6 +1426,7 @@ func TestIngestExternal(t *testing.T) { } func TestIngestMemtableOverlaps(t *testing.T) { + defer leaktest.AfterTest(t)() comparers := []Comparer{ { Name: "default", @@ -1515,6 +1531,7 @@ func TestIngestMemtableOverlaps(t *testing.T) { } func TestKeyRangeBasic(t *testing.T) { + defer leaktest.AfterTest(t)() cmp := base.DefaultComparer.Compare k1 := KeyRange{Start: []byte("b"), End: []byte("c")} k1UserKeyBounds := k1.UserKeyBounds() @@ -1587,6 +1604,7 @@ func BenchmarkIngestOverlappingMemtable(b *testing.B) { } func TestIngestTargetLevel(t *testing.T) { + defer leaktest.AfterTest(t)() var d *DB defer func() { if d != nil { @@ -1679,6 +1697,7 @@ func TestIngestTargetLevel(t *testing.T) { } func TestIngest(t *testing.T) { + defer leaktest.AfterTest(t)() var mem vfs.FS var d *DB var flushed bool @@ -1823,6 +1842,7 @@ func (p linkAndRemovePredicate) Evaluate(op errorfs.Op) bool { var _ errorfs.Predicate = &linkAndRemovePredicate{} func TestIngestError(t *testing.T) { + defer leaktest.AfterTest(t)() for _, linkAndRemove := range []bool{false, true} { for i := int32(0); ; i++ { mem := vfs.NewMem() @@ -1940,6 +1960,7 @@ func TestIngestError(t *testing.T) { } func TestIngestIdempotence(t *testing.T) { + defer leaktest.AfterTest(t)() // Use an on-disk filesystem, because Ingest with a MemFS will copy, not // link the ingested file. dir, err := os.MkdirTemp("", "ingest-idempotence") @@ -1968,6 +1989,7 @@ func TestIngestIdempotence(t *testing.T) { } func TestIngestCompact(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() lel := MakeLoggingEventListener(&base.InMemLogger{}) d, err := Open("", &Options{ @@ -2011,6 +2033,7 @@ func TestIngestCompact(t *testing.T) { } func TestConcurrentIngest(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() d, err := Open("", &Options{ FS: mem, @@ -2058,6 +2081,7 @@ func TestConcurrentIngest(t *testing.T) { } func TestConcurrentIngestCompact(t *testing.T) { + defer leaktest.AfterTest(t)() for i := 0; i < 2; i++ { t.Run("", func(t *testing.T) { mem := vfs.NewMem() @@ -2175,6 +2199,7 @@ L6: } func TestIngestFlushQueuedMemTable(t *testing.T) { + defer leaktest.AfterTest(t)() // Verify that ingestion forces a flush of a queued memtable. mem := vfs.NewMem() @@ -2220,6 +2245,7 @@ func TestIngestFlushQueuedMemTable(t *testing.T) { } func TestIngestStats(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() d, err := Open("", &Options{ FS: mem, @@ -2253,6 +2279,7 @@ func TestIngestStats(t *testing.T) { } func TestIngestFlushQueuedLargeBatch(t *testing.T) { + defer leaktest.AfterTest(t)() // Verify that ingestion forces a flush of a queued large batch. mem := vfs.NewMem() @@ -2293,6 +2320,7 @@ func TestIngestFlushQueuedLargeBatch(t *testing.T) { } func TestIngestMemtablePendingOverlap(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() d, err := Open("", &Options{ FS: mem, @@ -2388,6 +2416,7 @@ func TestIngestMemtablePendingOverlap(t *testing.T) { // edits should contain new files with monotonically increasing sequence // numbers, since every flush and every ingest conflicts with one another. func TestIngestMemtableOverlapRace(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() el := MakeLoggingEventListener(testutils.Logger{T: t}) d, err := Open("", &Options{ @@ -2525,6 +2554,7 @@ func (fs noRemoveFS) Remove(string) error { } func TestIngestFileNumReuseCrash(t *testing.T) { + defer leaktest.AfterTest(t)() const count = 10 // Use an on-disk filesystem, because Ingest with a MemFS will copy, not // link the ingested file. @@ -2604,6 +2634,7 @@ func TestIngestFileNumReuseCrash(t *testing.T) { } func TestIngest_UpdateSequenceNumber(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() cmp := base.DefaultComparer.Compare parse := func(input string) (sstable.RawWriter, error) { @@ -2738,6 +2769,7 @@ func TestIngest_UpdateSequenceNumber(t *testing.T) { } func TestIngestCleanup(t *testing.T) { + defer leaktest.AfterTest(t)() fns := []base.TableNum{0, 1, 2} testCases := []struct { @@ -2837,6 +2869,7 @@ func (l *fatalCapturingLogger) Fatalf(_ string, args ...interface{}) { } func TestIngestValidation(t *testing.T) { + defer leaktest.AfterTest(t)() type keyVal struct { key, val []byte } diff --git a/iterator_histories_test.go b/iterator_histories_test.go index 52d7ed2f581..8f0a14e1d58 100644 --- a/iterator_histories_test.go +++ b/iterator_histories_test.go @@ -12,6 +12,7 @@ import ( "strings" "testing" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" @@ -29,6 +30,7 @@ import ( // fragmented spans is susceptible to races. func TestIterHistories(t *testing.T) { + defer leaktest.AfterTest(t)() datadriven.Walk(t, "testdata/iter_histories", func(t *testing.T, path string) { filename := filepath.Base(path) switch { diff --git a/iterator_test.go b/iterator_test.go index 6a0faf1208d..bd1d01148ec 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" @@ -167,6 +168,7 @@ func testIterator( } func TestIterator(t *testing.T) { + defer leaktest.AfterTest(t)() var merge Merge var kvs []base.InternalKV @@ -312,6 +314,7 @@ func (f *minSeqNumFilter) SyntheticSuffixIntersects(prop []byte, suffix []byte) } func TestReadSampling(t *testing.T) { + defer leaktest.AfterTest(t)() var d *DB defer func() { if d != nil { @@ -474,6 +477,7 @@ func TestReadSampling(t *testing.T) { } func TestIteratorTableFilter(t *testing.T) { + defer leaktest.AfterTest(t)() var d *DB defer func() { if d != nil { @@ -537,6 +541,7 @@ func TestIteratorTableFilter(t *testing.T) { } func TestIteratorNextPrev(t *testing.T) { + defer leaktest.AfterTest(t)() var mem vfs.FS var d *DB defer func() { @@ -598,6 +603,7 @@ func TestIteratorNextPrev(t *testing.T) { } func TestIteratorStats(t *testing.T) { + defer leaktest.AfterTest(t)() var mem vfs.FS var d *DB defer func() { @@ -676,6 +682,7 @@ func (i *iterSeekOptWrapper) SeekPrefixGE( } func TestIteratorSeekOpt(t *testing.T) { + defer leaktest.AfterTest(t)() var d *DB defer func() { require.NoError(t, d.Close()) @@ -843,6 +850,7 @@ func (i *errorSeekIter) Error() error { } func TestIteratorSeekOptErrors(t *testing.T) { + defer leaktest.AfterTest(t)() var kvs []base.InternalKV var errorIter errorSeekIter @@ -935,6 +943,7 @@ func (bi *testBlockIntervalMapper) MapRangeKeys(span sstable.Span) (sstable.Bloc return sstable.BlockInterval{}, nil } func TestIteratorBlockIntervalFilter(t *testing.T) { + defer leaktest.AfterTest(t)() var mem vfs.FS var d *DB defer func() { @@ -1077,6 +1086,7 @@ func randKey(n int, rng *rand.Rand) ([]byte, int) { } func TestIteratorRandomizedBlockIntervalFilter(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() opts := &Options{ FS: mem, @@ -1149,6 +1159,7 @@ func TestIteratorRandomizedBlockIntervalFilter(t *testing.T) { } func TestIteratorGuaranteedDurable(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() opts := &Options{FS: mem} d, err := Open("", opts) @@ -1190,6 +1201,7 @@ func TestIteratorGuaranteedDurable(t *testing.T) { } func TestIteratorBoundsLifetimes(t *testing.T) { + defer leaktest.AfterTest(t)() rng := rand.New(rand.NewPCG(0, uint64(time.Now().UnixNano()))) d := newPointTestkeysDatabase(t, testkeys.Alpha(2)) defer func() { require.NoError(t, d.Close()) }() @@ -1324,6 +1336,7 @@ func TestIteratorBoundsLifetimes(t *testing.T) { } func TestIteratorStatsMerge(t *testing.T) { + defer leaktest.AfterTest(t)() s := IteratorStats{ ForwardSeekCount: [NumStatsKind]int{1, 2}, ReverseSeekCount: [NumStatsKind]int{3, 4}, @@ -1419,6 +1432,7 @@ func TestIteratorStatsMerge(t *testing.T) { // iterator and constructing a new iterator with NewIter. The long-lived // iterator and the new iterator should surface identical iterator states. func TestSetOptionsEquivalence(t *testing.T) { + defer leaktest.AfterTest(t)() seed := uint64(time.Now().UnixNano()) // Call a helper function with the seed so that the seed appears within // stack traces if there's a panic. @@ -2044,6 +2058,7 @@ func BenchmarkBlockPropertyFilter(b *testing.B) { } func TestRangeKeyMaskingRandomized(t *testing.T) { + defer leaktest.AfterTest(t)() seed := *seed if seed == 0 { seed = uint64(time.Now().UnixNano()) @@ -2268,6 +2283,7 @@ func TestRangeKeyMaskingRandomized(t *testing.T) { } func TestIteratorSeekPrefixGERandomized(t *testing.T) { + defer leaktest.AfterTest(t)() seed := uint64(time.Now().UnixNano()) t.Logf("seed: %d", seed) rng := rand.New(rand.NewPCG(seed, seed)) diff --git a/level_checker_test.go b/level_checker_test.go index 261e5f8368c..5ca32aac823 100644 --- a/level_checker_test.go +++ b/level_checker_test.go @@ -14,6 +14,7 @@ import ( "strings" "testing" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" @@ -33,6 +34,7 @@ import ( ) func TestCheckLevelsBasics(t *testing.T) { + defer leaktest.AfterTest(t)() testCases := []string{"db-stage-1", "db-stage-2", "db-stage-3", "db-stage-4"} for _, tc := range testCases { t.Run(tc, func(t *testing.T) { @@ -89,6 +91,7 @@ func (f *failMerger) Close() error { } func TestCheckLevelsCornerCases(t *testing.T) { + defer leaktest.AfterTest(t)() if invariants.Enabled { t.Skip("disabled under invariants; relies on violating invariants to detect them") } @@ -303,6 +306,7 @@ func TestCheckLevelsCornerCases(t *testing.T) { }) } func TestPerformValidationForSSTableFailures(t *testing.T) { + defer leaktest.AfterTest(t)() const tableNum = base.TableNum(1) // [] diff --git a/level_iter_test.go b/level_iter_test.go index 344e834fc15..2840ae4662f 100644 --- a/level_iter_test.go +++ b/level_iter_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/bloom" @@ -36,6 +37,7 @@ const ( ) func TestLevelIter(t *testing.T) { + defer leaktest.AfterTest(t)() var iterKVs [][]base.InternalKV var files manifest.LevelSlice @@ -324,6 +326,7 @@ func (lt *levelIterTest) runBuild(d *datadriven.TestData) string { } func TestLevelIterBoundaries(t *testing.T) { + defer leaktest.AfterTest(t)() lt := newLevelIterTest() defer lt.runClear() @@ -478,6 +481,7 @@ func (i *levelIterTestIter) Prev() *base.InternalKV { } func TestLevelIterSeek(t *testing.T) { + defer leaktest.AfterTest(t)() lt := newLevelIterTest() defer lt.runClear() diff --git a/lsm_view_test.go b/lsm_view_test.go index 52dae27d1e9..bd694748723 100644 --- a/lsm_view_test.go +++ b/lsm_view_test.go @@ -7,10 +7,12 @@ package pebble import ( "testing" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" ) func TestLSMViewURL(t *testing.T) { + defer leaktest.AfterTest(t)() datadriven.RunTest(t, "testdata/lsm_view", func(t *testing.T, td *datadriven.TestData) string { switch td.Cmd { diff --git a/mem_table_test.go b/mem_table_test.go index 5d98d55f7a6..dc869935f53 100644 --- a/mem_table_test.go +++ b/mem_table_test.go @@ -15,6 +15,7 @@ import ( "time" "unicode" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/arenaskl" @@ -82,6 +83,7 @@ func ikey(s string) InternalKey { } func TestMemTableBasic(t *testing.T) { + defer leaktest.AfterTest(t)() // Check the empty DB. m := newMemTable(memTableOptions{}) if got, want := m.count(), 0; got != want { @@ -132,6 +134,7 @@ func TestMemTableBasic(t *testing.T) { } func TestMemTableCount(t *testing.T) { + defer leaktest.AfterTest(t)() m := newMemTable(memTableOptions{}) for i := 0; i < 200; i++ { if j := m.count(); j != i { @@ -142,6 +145,7 @@ func TestMemTableCount(t *testing.T) { } func TestMemTableEmpty(t *testing.T) { + defer leaktest.AfterTest(t)() m := newMemTable(memTableOptions{}) if !m.empty() { t.Errorf("got !empty, want empty") @@ -154,6 +158,7 @@ func TestMemTableEmpty(t *testing.T) { } func TestMemTable1000Entries(t *testing.T) { + defer leaktest.AfterTest(t)() // Initialize the DB. const N = 1000 m0 := newMemTable(memTableOptions{}) @@ -227,6 +232,7 @@ func TestMemTable1000Entries(t *testing.T) { } func TestMemTableIter(t *testing.T) { + defer leaktest.AfterTest(t)() var mem *memTable for _, testdata := range []string{ "testdata/internal_iter_next", "testdata/internal_iter_bounds"} { @@ -274,6 +280,7 @@ func TestMemTableIter(t *testing.T) { } func TestMemTableDeleteRange(t *testing.T) { + defer leaktest.AfterTest(t)() var mem *memTable var seqNum base.SeqNum @@ -318,6 +325,7 @@ func TestMemTableDeleteRange(t *testing.T) { } func TestMemTableConcurrentDeleteRange(t *testing.T) { + defer leaktest.AfterTest(t)() // Concurrently write and read range tombstones. Workers add range // tombstones, and then immediately retrieve them verifying that the // tombstones they've added are all present. @@ -366,6 +374,7 @@ func TestMemTableConcurrentDeleteRange(t *testing.T) { } func TestMemTableReserved(t *testing.T) { + defer leaktest.AfterTest(t)() m := newMemTable(memTableOptions{size: 5000}) // Increase to 2 references. m.writerRef() @@ -381,6 +390,7 @@ func TestMemTableReserved(t *testing.T) { } func TestMemTable(t *testing.T) { + defer leaktest.AfterTest(t)() var m *memTable var buf bytes.Buffer batches := map[string]*Batch{} diff --git a/merging_iter_heap_test.go b/merging_iter_heap_test.go index 4689df6699c..dbfe24f776f 100644 --- a/merging_iter_heap_test.go +++ b/merging_iter_heap_test.go @@ -6,11 +6,13 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/pebble/internal/base" "github.com/stretchr/testify/require" ) func TestMergingIterHeap(t *testing.T) { + defer leaktest.AfterTest(t)() seed := time.Now().UnixNano() t.Logf("Using seed %d", seed) rng := rand.New(rand.NewPCG(0, uint64(seed))) @@ -105,6 +107,7 @@ func TestMergingIterHeap(t *testing.T) { // with uniform random keys. There is a ~3.7% saving, with the following being // a representative result: cmp needed=104325 called=100416(frac=0.9625). func TestMergingIterHeapInit(t *testing.T) { + defer leaktest.AfterTest(t)() seed := time.Now().UnixNano() t.Logf("Using seed %d", seed) rng := rand.New(rand.NewPCG(0, uint64(seed))) diff --git a/merging_iter_test.go b/merging_iter_test.go index e555bbe2f60..4637ef79095 100644 --- a/merging_iter_test.go +++ b/merging_iter_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/bloom" @@ -34,6 +35,7 @@ import ( ) func TestMergingIter(t *testing.T) { + defer leaktest.AfterTest(t)() var stats base.InternalIteratorStats newFunc := func(iters ...internalIterator) internalIterator { return newMergingIter(nil /* logger */, &stats, DefaultComparer.Compare, @@ -53,6 +55,7 @@ func TestMergingIter(t *testing.T) { } func TestMergingIterSeek(t *testing.T) { + defer leaktest.AfterTest(t)() var def string datadriven.RunTest(t, "testdata/merging_iter_seek", func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { @@ -84,6 +87,7 @@ func TestMergingIterSeek(t *testing.T) { } func TestMergingIterNextPrev(t *testing.T) { + defer leaktest.AfterTest(t)() // The data is the same in each of these cases, but divided up amongst the // iterators differently. This data must match the definition in // testdata/internal_iter_next. @@ -144,6 +148,7 @@ func TestMergingIterNextPrev(t *testing.T) { } func TestMergingIterDataDriven(t *testing.T) { + defer leaktest.AfterTest(t)() memFS := vfs.NewMem() cmp := DefaultComparer.Compare fmtKey := DefaultComparer.FormatKey diff --git a/metrics_test.go b/metrics_test.go index a8fa018a2b5..dfe3abf083c 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/crlib/crstrings" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" @@ -212,6 +213,7 @@ func init() { } func TestMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() if runtime.GOARCH == "386" { t.Skip("skipped on 32-bit due to slightly varied output") } @@ -552,6 +554,7 @@ func TestMetrics(t *testing.T) { } func TestMetricsWAmpDisableWAL(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", &Options{FS: vfs.NewMem(), DisableWAL: true, Logger: testutils.Logger{T: t}}) require.NoError(t, err) ks := testkeys.Alpha(2) @@ -575,6 +578,7 @@ func TestMetricsWAmpDisableWAL(t *testing.T) { // Metrics.WAL.BytesWritten metric is always nondecreasing. // It's a regression test for issue #3505. func TestMetricsWALBytesWrittenMonotonicity(t *testing.T) { + defer leaktest.AfterTest(t)() fs := errorfs.Wrap(vfs.NewMem(), errorfs.RandomLatency( nil, 100*time.Microsecond, time.Now().UnixNano(), 0 /* no limit */)) d, err := Open("", &Options{ @@ -586,6 +590,7 @@ func TestMetricsWALBytesWrittenMonotonicity(t *testing.T) { MemTableSize: 1 << 20, /* 20 KiB */ }) require.NoError(t, err) + defer d.Close() stopCh := make(chan struct{}) diff --git a/obsolete_files_test.go b/obsolete_files_test.go index 923d0885606..1018bc5be77 100644 --- a/obsolete_files_test.go +++ b/obsolete_files_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/testutils" @@ -22,6 +23,7 @@ import ( ) func TestCleaner(t *testing.T) { + defer leaktest.AfterTest(t)() dbs := make(map[string]*DB) defer func() { for _, db := range dbs { diff --git a/open.go b/open.go index 5d89d40188c..0d4a87a8d3f 100644 --- a/open.go +++ b/open.go @@ -207,6 +207,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) { d.compactionScheduler = newConcurrencyLimitScheduler(defaultTimeSource{}) } + var compactionSchedulerRegistered bool defer func() { // If an error or panic occurs during open, attempt to release the manually // allocated memory resources. Note that rather than look for an error, we @@ -238,6 +239,9 @@ func Open(dirname string, opts *Options) (db *DB, err error) { if d.mu.versions.manifestFile != nil { _ = d.mu.versions.manifestFile.Close() } + if compactionSchedulerRegistered { + d.compactionScheduler.Unregister() + } if r != nil { panic(r) } @@ -545,6 +549,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) { // d.maybeScheduleFlush, since completion of the flush can trigger // compactions. d.compactionScheduler.Register(2, d) + compactionSchedulerRegistered = true if !d.opts.ReadOnly { d.maybeScheduleFlush() for d.mu.compact.flushing { diff --git a/open_test.go b/open_test.go index d5a4e5ec773..877809052fd 100644 --- a/open_test.go +++ b/open_test.go @@ -29,6 +29,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/metamorphic" @@ -52,6 +53,7 @@ import ( ) func TestOpenSharedFileCache(t *testing.T) { + defer leaktest.AfterTest(t)() c := cache.New(cacheDefaultSize) tc := NewFileCache(16, 100) defer tc.Unref() @@ -86,6 +88,7 @@ func TestOpenSharedFileCache(t *testing.T) { } func TestErrorIfExists(t *testing.T) { + defer leaktest.AfterTest(t)() opts := testingRandomized(t, &Options{ FS: vfs.NewMem(), ErrorIfExists: true, @@ -107,6 +110,7 @@ func TestErrorIfExists(t *testing.T) { } func TestErrorIfNotExists(t *testing.T) { + defer leaktest.AfterTest(t)() opts := testingRandomized(t, &Options{ FS: vfs.NewMem(), ErrorIfNotExists: true, @@ -131,6 +135,7 @@ func TestErrorIfNotExists(t *testing.T) { } func TestErrorIfNotPristine(t *testing.T) { + defer leaktest.AfterTest(t)() opts := testingRandomized(t, &Options{ FS: vfs.NewMem(), ErrorIfNotPristine: true, @@ -166,6 +171,7 @@ func TestErrorIfNotPristine(t *testing.T) { } func TestOpen_WALFailover(t *testing.T) { + defer leaktest.AfterTest(t)() filesystems := map[string]vfs.FS{} extractFSAndPath := func(cmdArg datadriven.CmdArg) (fs vfs.FS, dir string) { @@ -287,6 +293,7 @@ func TestOpen_WALFailover(t *testing.T) { // - Closes the database reopens the database (should now succeed). // - Closes and cleans up all locks. func TestOpenAlreadyLocked(t *testing.T) { + defer leaktest.AfterTest(t)() testCases := []struct { name string setupLocks func(opts *Options, dirname, walDirname, secondaryWalDirname string, fs vfs.FS) error @@ -497,6 +504,7 @@ func TestOpenAlreadyLocked(t *testing.T) { } func TestNewDBFilenames(t *testing.T) { + defer leaktest.AfterTest(t)() versions := map[FormatMajorVersion][]string{ internalFormatNewest: { "000002.log", @@ -633,6 +641,7 @@ func testOpenCloseOpenClose(t *testing.T, fs vfs.FS, root string) { } func TestOpenCloseOpenClose(t *testing.T) { + defer leaktest.AfterTest(t)() for _, fstype := range []string{"disk", "mem"} { t.Run(fstype, func(t *testing.T) { var fs vfs.FS @@ -656,6 +665,7 @@ func TestOpenCloseOpenClose(t *testing.T) { } func TestOpenOptionsCheck(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() opts := testingRandomized(t, &Options{FS: mem}) @@ -682,6 +692,7 @@ func TestOpenOptionsCheck(t *testing.T) { } func TestOpenCrashWritingOptions(t *testing.T) { + defer leaktest.AfterTest(t)() memFS := vfs.NewMem() d, err := Open("", &Options{FS: memFS}) @@ -729,6 +740,7 @@ func (f optionsTornWriteFile) Write(b []byte) (int, error) { } func TestOpenReadOnly(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() { @@ -849,6 +861,7 @@ func TestOpenReadOnly(t *testing.T) { } func TestOpenWALReplay(t *testing.T) { + defer leaktest.AfterTest(t)() largeValue := []byte(strings.Repeat("a", 100<<10)) hugeValue := []byte(strings.Repeat("b", 10<<20)) checkIter := func(iter *Iterator, err error) { @@ -929,6 +942,7 @@ func TestOpenWALReplay(t *testing.T) { // Reproduction for https://github.com/cockroachdb/pebble/issues/2234. func TestWALReplaySequenceNumBug(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() d, err := Open("", testingRandomized(t, &Options{ FS: mem, @@ -990,6 +1004,7 @@ func TestWALReplaySequenceNumBug(t *testing.T) { // memtable has been flushed. We test all 3 reasons for flushing: forced, size, // and large-batch. func TestOpenWALReplay2(t *testing.T) { + defer leaktest.AfterTest(t)() for _, readOnly := range []bool{false, true} { t.Run(fmt.Sprintf("read-only=%t", readOnly), func(t *testing.T) { for _, reason := range []string{"forced", "size", "large-batch"} { @@ -1052,6 +1067,8 @@ func TestOpenWALReplay2(t *testing.T) { // first WAL because otherwise we may violate point-in-time recovery // semantics. See #864. func TestTwoWALReplayCorrupt(t *testing.T) { + // TODO(radu): fix goroutine leak from diskHealthCheckingFS. + // defer leaktest.AfterTest(t)() // Use the real filesystem so that we can seek and overwrite WAL data // easily. dir, err := os.MkdirTemp("", "wal-replay") @@ -1112,6 +1129,7 @@ func TestTwoWALReplayCorrupt(t *testing.T) { // new WAL is created, the current manifest's MinUnflushedLogNum must be // higher than the previous WAL. func TestCrashOpenCrashAfterWALCreation(t *testing.T) { + defer leaktest.AfterTest(t)() fs := vfs.NewCrashableMem() getLogs := func() (logs []string) { @@ -1268,6 +1286,7 @@ func (d *crashAfterLogCreationDir) Sync() error { // // See cockroachdb/cockroach#48660. func TestOpenWALReplayReadOnlySeqNums(t *testing.T) { + defer leaktest.AfterTest(t)() const root = "" mem := vfs.NewMem() @@ -1336,6 +1355,7 @@ func TestOpenWALReplayReadOnlySeqNums(t *testing.T) { } func TestOpenWALReplayMemtableGrowth(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() const memTableSize = 64 * 1024 * 1024 opts := &Options{ @@ -1360,6 +1380,7 @@ func TestOpenWALReplayMemtableGrowth(t *testing.T) { } func TestPeek(t *testing.T) { + defer leaktest.AfterTest(t)() // The file paths are UNIX-oriented. To avoid duplicating the test fixtures // just for Windows, just skip the tests on Windows. if runtime.GOOS == "windows" { @@ -1381,6 +1402,7 @@ func TestPeek(t *testing.T) { } func TestGetVersion(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() opts := &Options{ FS: mem, @@ -1439,6 +1461,7 @@ func TestGetVersion(t *testing.T) { // TestOpenNeverFlushed verifies that we can open a database that had an // ingestion but no other operations. func TestOpenNeverFlushed(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() sstFile, err := mem.Create("to-ingest.sst", vfs.WriteCategoryUnspecified) @@ -1472,6 +1495,7 @@ func TestOpenNeverFlushed(t *testing.T) { } func TestOpen_ErrorIfUnknownFormatVersion(t *testing.T) { + defer leaktest.AfterTest(t)() fs := vfs.NewMem() d, err := Open("", &Options{ FS: fs, @@ -1566,6 +1590,7 @@ func (f *closeTrackingFile) Close() error { } func TestCheckConsistency(t *testing.T) { + defer leaktest.AfterTest(t)() const dir = "./test" mem := vfs.NewMem() mem.MkdirAll(dir, 0755) @@ -1679,6 +1704,7 @@ func TestCheckConsistency(t *testing.T) { } func TestOpenRatchetsNextFileNum(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() memShared := remote.NewInMem() @@ -1719,10 +1745,11 @@ func TestOpenRatchetsNextFileNum(t *testing.T) { require.NoError(t, d.Set([]byte("bar2"), []byte("value"), nil)) require.NoError(t, d.Flush()) require.NoError(t, d.Compact(context.Background(), []byte("a"), []byte("z"), false)) - + require.NoError(t, d.Close()) } func TestMkdirAllAndSyncParents(t *testing.T) { + defer leaktest.AfterTest(t)() if filepath.Separator != '/' { t.Skip("skipping due to path separator") } @@ -1776,6 +1803,7 @@ func TestMkdirAllAndSyncParents(t *testing.T) { // // This test is partially a regression test for #3865. func TestWALFailoverRandomized(t *testing.T) { + defer leaktest.AfterTest(t)() seed := time.Now().UnixNano() t.Logf("seed %d", seed) rng := rand.New(rand.NewPCG(1, uint64(seed))) @@ -1956,6 +1984,8 @@ func runRandomizedCrashTest(t *testing.T, opts randomizedCrashTestOptions) { for o := 0; o < opts.numOps; o++ { nextRandomOp()() } + wg.Wait() + require.NoError(t, d.Close()) } // TestWALHardCrashRandomized is a randomized test exercising recovery in the @@ -1964,6 +1994,7 @@ func runRandomizedCrashTest(t *testing.T, opts randomizedCrashTestOptions) { // ensures that the resulting DB state opens successfully, and the contents of // the DB match the expectations based on the keys written. func TestWALHardCrashRandomized(t *testing.T) { + defer leaktest.AfterTest(t)() for i := 0; i < 4; i++ { func() { seed := time.Now().UnixNano() @@ -2002,6 +2033,7 @@ func TestWALHardCrashRandomized(t *testing.T) { } func TestWALCorruption(t *testing.T) { + defer leaktest.AfterTest(t)() fs := vfs.NewMem() d, err := Open("", testingRandomized(t, &Options{ FS: fs, @@ -2043,6 +2075,7 @@ func TestWALCorruption(t *testing.T) { } func TestWALCorruptionBitFlip(t *testing.T) { + defer leaktest.AfterTest(t)() fs := vfs.NewMem() d, err := Open("", testingRandomized(t, &Options{ FS: fs, diff --git a/options_test.go b/options_test.go index 8d5a3d32117..a07d87a8050 100644 --- a/options_test.go +++ b/options_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" @@ -66,6 +67,7 @@ func testingRandomized(t testing.TB, o *Options) *Options { } func TestTargetFileSize(t *testing.T) { + defer leaktest.AfterTest(t)() opts := DefaultOptions() testCases := []struct { @@ -91,6 +93,7 @@ func TestTargetFileSize(t *testing.T) { } func TestDefaultOptionsString(t *testing.T) { + defer leaktest.AfterTest(t)() n := runtime.GOMAXPROCS(8) defer runtime.GOMAXPROCS(n) @@ -217,6 +220,7 @@ func TestDefaultOptionsString(t *testing.T) { } func TestOptionsCheckCompatibility(t *testing.T) { + defer leaktest.AfterTest(t)() storeDir := "/mnt/foo" opts := DefaultOptions() s := opts.String() @@ -351,6 +355,7 @@ func (testCleaner) String() string { } func TestOptionsParse(t *testing.T) { + defer leaktest.AfterTest(t)() testComparer := *DefaultComparer testComparer.Name = "test-comparer" testMerger := *DefaultMerger @@ -433,6 +438,7 @@ func TestOptionsParse(t *testing.T) { } func TestOptionsParseLevelNoQuotes(t *testing.T) { + defer leaktest.AfterTest(t)() withQuotes := ` [Options] [Level "1"] @@ -463,6 +469,7 @@ func TestOptionsParseLevelNoQuotes(t *testing.T) { } func TestOptionsParseInvalidLevel(t *testing.T) { + defer leaktest.AfterTest(t)() str := ` [Options] [Level 1] @@ -493,6 +500,7 @@ func TestOptionsParseInvalidLevel(t *testing.T) { } func TestOptionsParseComparerOverwrite(t *testing.T) { + defer leaktest.AfterTest(t)() // Test that an unrecognized comparer in the OPTIONS file does not nil out // the Comparer field. o := &Options{Comparer: testkeys.Comparer} @@ -503,6 +511,7 @@ comparer=unrecognized`, nil) } func TestOptionsValidate(t *testing.T) { + defer leaktest.AfterTest(t)() testCases := []struct { options string expected string @@ -552,6 +561,7 @@ func TestOptionsValidate(t *testing.T) { } func TestKeyCategories(t *testing.T) { + defer leaktest.AfterTest(t)() kc := MakeUserKeyCategories(base.DefaultComparer.Compare, []UserKeyCategory{ {Name: "b", UpperBound: []byte("b")}, {Name: "dd", UpperBound: []byte("dd")}, @@ -614,6 +624,7 @@ func TestKeyCategories(t *testing.T) { } func TestApplyDBCompressionSettings(t *testing.T) { + defer leaktest.AfterTest(t)() var o Options o.testingRandomized(t) @@ -637,6 +648,7 @@ func TestApplyDBCompressionSettings(t *testing.T) { } func TestStaticSpanPolicyFunc(t *testing.T) { + defer leaktest.AfterTest(t)() var buf bytes.Buffer datadriven.RunTest(t, "testdata/static_span_policy_func", func(t *testing.T, td *datadriven.TestData) string { buf.Reset() diff --git a/range_del_test.go b/range_del_test.go index 3202016a759..d8eb22ea30e 100644 --- a/range_del_test.go +++ b/range_del_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/buildtags" @@ -28,6 +29,7 @@ import ( ) func TestRangeDel(t *testing.T) { + defer leaktest.AfterTest(t)() var d *DB defer func() { if d != nil { @@ -101,6 +103,7 @@ func TestRangeDel(t *testing.T) { } func TestFlushDelay(t *testing.T) { + defer leaktest.AfterTest(t)() opts := &Options{ FS: vfs.NewMem(), Comparer: testkeys.Comparer, @@ -200,6 +203,7 @@ func TestFlushDelay(t *testing.T) { } func TestFlushDelayStress(t *testing.T) { + defer leaktest.AfterTest(t)() rng := rand.New(rand.NewPCG(0, uint64(time.Now().UnixNano()))) opts := &Options{ FS: vfs.NewMem(), @@ -261,6 +265,7 @@ func TestFlushDelayStress(t *testing.T) { // problem is that range tombstones are not truncated to sstable boundaries on // disk, only in memory. func TestRangeDelCompactionTruncation(t *testing.T) { + defer leaktest.AfterTest(t)() runTest := func(formatVersion FormatMajorVersion) { // Use a small target file size so that there is a single key per sstable. d, err := Open("", &Options{ @@ -410,6 +415,7 @@ L3: // sstable expanding to overlap its left neighbor if we failed to truncate an // sstable's boundaries to the compaction input boundaries. func TestRangeDelCompactionTruncation2(t *testing.T) { + defer leaktest.AfterTest(t)() // Use a small target file size so that there is a single key per sstable. d, err := Open("", &Options{ FS: vfs.NewMem(), @@ -471,6 +477,7 @@ L6: // TODO(peter): rewrite this test, TestRangeDelCompactionTruncation, and // TestRangeDelCompactionTruncation2 as data-driven tests. func TestRangeDelCompactionTruncation3(t *testing.T) { + defer leaktest.AfterTest(t)() // Use a small target file size so that there is a single key per sstable. d, err := Open("tmp", &Options{ Cleaner: ArchiveCleaner{}, diff --git a/scan_internal_test.go b/scan_internal_test.go index c4cf876fce0..d7571003f97 100644 --- a/scan_internal_test.go +++ b/scan_internal_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/bloom" @@ -31,6 +32,7 @@ import ( ) func TestScanStatistics(t *testing.T) { + defer leaktest.AfterTest(t)() var d *DB type scanInternalReader interface { ScanStatistics( @@ -210,6 +212,7 @@ func TestScanStatistics(t *testing.T) { } func TestScanInternal(t *testing.T) { + defer leaktest.AfterTest(t)() var d *DB type scanInternalReader interface { ScanInternal(context.Context, ScanInternalOptions) error @@ -590,6 +593,7 @@ func TestScanInternal(t *testing.T) { } func TestPointCollapsingIter(t *testing.T) { + defer leaktest.AfterTest(t)() var def string datadriven.RunTest(t, "testdata/point_collapsing_iter", func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { diff --git a/snapshot_test.go b/snapshot_test.go index f3a9b4b33f3..91ed88700d7 100644 --- a/snapshot_test.go +++ b/snapshot_test.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/crlib/crstrings" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" @@ -26,6 +27,7 @@ import ( ) func TestSnapshotListToSlice(t *testing.T) { + defer leaktest.AfterTest(t)() testCases := []struct { vals []base.SeqNum }{ @@ -201,12 +203,14 @@ func testSnapshotImpl(t *testing.T, newSnapshot func(d *DB) Reader) { } func TestSnapshot(t *testing.T) { + defer leaktest.AfterTest(t)() testSnapshotImpl(t, func(d *DB) Reader { return d.NewSnapshot() }) } func TestEventuallyFileOnlySnapshot(t *testing.T) { + defer leaktest.AfterTest(t)() testSnapshotImpl(t, func(d *DB) Reader { // NB: all keys in testdata/snapshot fall within the ASCII keyrange a-z. return d.NewEventuallyFileOnlySnapshot([]KeyRange{{Start: []byte("a"), End: []byte("z")}}) @@ -214,6 +218,7 @@ func TestEventuallyFileOnlySnapshot(t *testing.T) { } func TestSnapshotClosed(t *testing.T) { + defer leaktest.AfterTest(t)() d, err := Open("", &Options{ FS: vfs.NewMem(), Logger: testutils.Logger{T: t}, @@ -240,6 +245,7 @@ func TestSnapshotClosed(t *testing.T) { } func TestSnapshotRangeDeletionStress(t *testing.T) { + defer leaktest.AfterTest(t)() const runs = 200 const middleKey = runs * runs @@ -325,6 +331,7 @@ func TestSnapshotRangeDeletionStress(t *testing.T) { // that occurred between the reading of the sequence number and appending the // snapshot could drop keys required by the snapshot. func TestNewSnapshotRace(t *testing.T) { + defer leaktest.AfterTest(t)() const runs = 10 d, err := Open("", &Options{ FS: vfs.NewMem(), @@ -386,6 +393,7 @@ func TestNewSnapshotRace(t *testing.T) { // Test for fix to https://github.com/cockroachdb/pebble/issues/5390. func TestEFOSAndExciseRace(t *testing.T) { + defer leaktest.AfterTest(t)() o := &Options{ FS: vfs.NewMem(), L0CompactionThreshold: 10, diff --git a/table_stats_test.go b/table_stats_test.go index e4ab16a758c..af9e3743771 100644 --- a/table_stats_test.go +++ b/table_stats_test.go @@ -13,6 +13,7 @@ import ( "strings" "testing" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/keyspan" @@ -27,6 +28,7 @@ import ( ) func TestTableStats(t *testing.T) { + defer leaktest.AfterTest(t)() // loadedInfo is protected by d.mu. var loadedInfo *TableStatsInfo mkOpts := func() *Options { @@ -200,6 +202,7 @@ func TestTableStats(t *testing.T) { } func TestTableRangeDeletionIter(t *testing.T) { + defer leaktest.AfterTest(t)() var m *manifest.TableMetadata cmp := testkeys.Comparer keySchema := colblk.DefaultKeySchema(cmp, 16) @@ -285,6 +288,7 @@ func TestTableRangeDeletionIter(t *testing.T) { // depend on blob or table properties are the same after the store is reopened // and the initial stats are loaded. func TestStatsAfterReopen(t *testing.T) { + defer leaktest.AfterTest(t)() opts := &Options{ DisableAutomaticCompactions: true, FS: vfs.NewMem(), @@ -336,4 +340,5 @@ func TestStatsAfterReopen(t *testing.T) { if before != after { t.Errorf("metrics differ.\nbefore:\n%s\nafter:\n%s", before, after) } + require.NoError(t, d.Close()) } diff --git a/version_set_test.go b/version_set_test.go index ef19fb9cd1f..9f5d5acbfce 100644 --- a/version_set_test.go +++ b/version_set_test.go @@ -21,6 +21,7 @@ import ( "unsafe" "github.com/cockroachdb/crlib/crstrings" + "github.com/cockroachdb/crlib/testutils/leaktest" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/manifest" @@ -48,6 +49,7 @@ func writeAndIngest(t *testing.T, mem vfs.FS, d *DB, k InternalKey, v []byte, fi } func TestVersionSet(t *testing.T) { + defer leaktest.AfterTest(t)() opts := &Options{ FS: vfs.NewMem(), Comparer: base.DefaultComparer, @@ -270,6 +272,7 @@ func TestVersionSet(t *testing.T) { } func TestVersionSetCheckpoint(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() require.NoError(t, mem.MkdirAll("ext", 0755)) @@ -303,6 +306,7 @@ func TestVersionSetCheckpoint(t *testing.T) { } func TestVersionSetSeqNums(t *testing.T) { + defer leaktest.AfterTest(t)() mem := vfs.NewMem() require.NoError(t, mem.MkdirAll("ext", 0755)) @@ -366,6 +370,7 @@ func TestVersionSetSeqNums(t *testing.T) { // // See #4518. func TestLargeKeys(t *testing.T) { + defer leaktest.AfterTest(t)() var largeKeyComparer = func() base.Comparer { c := *testkeys.Comparer c.FormatKey = func(key []byte) fmt.Formatter { @@ -392,6 +397,9 @@ func TestLargeKeys(t *testing.T) { DisableTableStats: true, } var err error + if d != nil { + require.NoError(t, d.Close()) + } d, err = runDBDefineCmd(td, opts) require.NoError(t, err) return runLSMCmd(td, d) @@ -503,6 +511,7 @@ func splitAt(s string, chars string) (string, string) { // across multiple 32KiB blocks within the record package's encoding. There have // previously been issues specifically decoding these multi-block version edits. func TestCrashDuringManifestWrite_LargeKeys(t *testing.T) { + defer leaktest.AfterTest(t)() seed := rand.Uint64() t.Logf("seed: %d", seed) rng := rand.New(rand.NewPCG(seed, 0))