Skip to content

Commit c87bff7

Browse files
committed
Add use_direct_reads_for_compaction option
Introduces a new DBOption `use_direct_reads_for_compaction` (default false) that lets users route compaction background reads through O_DIRECT while keeping user reads on the buffered/page-cache path. Sequential compaction reads otherwise pollute the OS page cache with read-once data that evicts the hot user-read working set; bypassing the cache for those reads protects user-read tail latency on write-heavy workloads without forcing users onto the global `use_direct_reads` path (which slows user reads dramatically). A naive implementation that only flipped the FileOptions returned by `OptimizeForCompactionTableRead` does not actually trigger the OS-level O_DIRECT open, because the TableCache (and FileMetaData::pinned_reader) already holds long-lived buffered handles opened at flush time or at DB::Open via LoadTableHandlers. Compaction would silently reuse those cached buffered handles and the kernel would never see the O_DIRECT flag. The fix opens ephemeral O_DIRECT handles for the lifetime of the compaction scan, separate from the cache: * TableCache::FindTable / NewIterator learn a `bypass_cache_for_scan` mode. When set, the pinned-reader fast path and the shared cache are skipped, GetTableReader is called directly with the caller's FileOptions, and ownership of the freshly opened TableReader is handed back to the caller. The iterator takes ownership via RegisterCleanup and frees the reader on destruction. * VersionSet::MakeInputIterator and LevelIterator plumb the flag through both the L0 and L1+ compaction-input paths. * CompactionJob::ProcessKeyValueCompaction enables the flag exactly when `use_direct_reads_for_compaction` is on, the global `use_direct_reads` is off, and `OptimizeForCompactionTableRead` actually produced `use_direct_reads=true` in the compaction-read FileOptions. An end-to-end test in db_compaction_test.cc uses the existing `NewRandomAccessFile:O_DIRECT` sync point in env/fs_posix.cc to assert that the kernel-level open really happens for compaction inputs when the flag is set, and never fires when the flag is off. The test is scoped to platforms that use the O_DIRECT path. A small unrelated convenience also lands here: a new db_bench flag `--bgwriter_num` that lets the writer thread in readwhilewriting use a wider keyspace than the readers. This is what made it possible to benchmark the new option realistically -- the readers see a small hot subset (cache-resident), the writer spreads puts across the full DB which drives continuous compaction. The new option follows the existing add_option.md checklist: it is registered in ImmutableDBOptions for serialization, surfaced through the C API, exposed in db_bench / db_stress / db_crashtest.py, randomized in RandomInitDBOptions, validated against allow_mmap_reads at Open time, and documented in unreleased_history. Java JNI is left for a follow-up. Benchmark results ================= Setup: Ubuntu 24.04 (kernel 7.0.5 OrbStack Linux VM on Apple Silicon), 14 vCPUs, virtio-blk disk. MGLRU disabled (echo 0 > /sys/kernel/mm/lru_gen/enabled). 14 GB DB (3.5M keys * 4 KB values), no compression. Each measurement run pinned to a 1 GB cgroup via `systemd-run --scope -p MemoryMax=1G -p MemorySwapMax=0`, so DB-to-cache ratio is ~14x. Page cache dropped between configs. Workload: readwhilewriting for 180 s, 4 reader threads on a hot 2,000-key subset (~8 MB, ~3% of cache) + 1 writer thread spreading overwrites across the full 3.5M-key keyspace (via `--bgwriter_num=3500000`), throttled at 100 MB/s. Compaction ran at ~500 MB/s read/write during the buffered run, ~400 MB/s with direct compaction. Each run was 3 minutes long; "buffered" is the existing default. | Config | Throughput | Read P50 | Read P99 | Read P99.9 | Read P99.99 | |-------------------------------------------|-----------------|---------------|---------------|----------------|----------------| | buffered (default) | 406 K ops/s | 7.34 us | 79.11 us | 533.14 us | 1647.79 us | | direct_compaction_read_write | **464 K ops/s** | **6.37 us** | **71.64 us** | **468.28 us** | **1363.91 us** | | | (+14%) | (-13%) | (-9%) | (-12%) | (-17%) | | direct_compaction_read_only | 421 K ops/s | 6.99 us | 88.95 us | 504.32 us | 1456.75 us | | | (+4%) | (-5%) | (+13%) | (-5%) | (-12%) | | use_direct_reads = true (existing global) | 442 K ops/s | 7.37 us | 50.82 us | 472.23 us | 1626.77 us | | | (+9%) | (0%) | (-36%) | (-11%) | (-1%) | The recommended production configuration is `use_direct_reads_for_compaction = true` together with `use_direct_io_for_flush_and_compaction = true` ("direct reads + writes for compaction"). It wins on every metric simultaneously: throughput up 14%, every read percentile from P50 to P99.99 down 9 to 17%. The existing global `use_direct_reads = true` flag does help P99 specifically but at a noticeable throughput cost and is no better at P99.99; the new compaction-only path is strictly better for the write-heavy workloads it is designed for. Higher DB-to-cache ratios (the Cassandra blog at https://lightfoot.dev/direct-i-o-for-cassandra-compaction-cutting-p99-read-latency-by-5x/ reports ~5x P99 improvement at a 43x ratio) should widen the gap further; the 14x ratio used above is what fit in a single laptop's disk budget. Repro recipe ============ Setup: - Install OrbStack on macOS or use any Linux host - On macOS: orb create -t ubuntu rocksdb-bench - Inside the Linux machine: apt-get install -y build-essential clang cmake git pkg-config \ libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev \ libzstd-dev rsync cmake -DCMAKE_BUILD_TYPE=Release -DPORTABLE=1 -DWITH_GFLAGS=1 \ -DWITH_TESTS=0 .. && make -j db_bench Build the source DB (once, unrestricted memory): ./db_bench --benchmarks=fillrandom,compact,waitforcompaction,stats \ --db=/path/to/source_db --num=3500000 --key_size=16 \ --value_size=4096 --write_buffer_size=16777216 \ --target_file_size_base=16777216 --max_background_jobs=4 \ --compression_type=none --cache_size=4194304 \ --max_bytes_for_level_base=67108864 --disable_wal=1 --sync=0 Per-config measurement (copy source_db -> scratch_db first, then drop_caches, then run under cgroup): sudo systemd-run --scope -p MemoryMax=1G -p MemorySwapMax=0 \ ./db_bench --use_existing_db=1 \ --benchmarks=readwhilewriting,stats --db=/path/to/scratch_db \ --threads=5 --duration=180 --statistics=true --histogram=1 \ --num=2000 --bgwriter_num=3500000 \ --key_size=16 --value_size=4096 \ --write_buffer_size=16777216 --target_file_size_base=16777216 \ --max_background_jobs=4 --compression_type=none \ --cache_size=4194304 --open_files=200 \ --skip_stats_update_on_db_open=true \ --max_bytes_for_level_base=67108864 \ --benchmark_write_rate_limit=104857600 \ --rate_limiter_bytes_per_sec=0 \ --use_direct_reads={true|false} \ --use_direct_reads_for_compaction={true|false} \ --use_direct_io_for_flush_and_compaction={true|false} Disable MGLRU first so the kernel uses the classic active/inactive LRU: echo 0 | sudo tee /sys/kernel/mm/lru_gen/enabled
1 parent 87c554b commit c87bff7

26 files changed

Lines changed: 693 additions & 33 deletions

db/c.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5038,6 +5038,16 @@ unsigned char rocksdb_options_get_use_direct_io_for_flush_and_compaction(
50385038
return opt->rep.use_direct_io_for_flush_and_compaction;
50395039
}
50405040

5041+
void rocksdb_options_set_use_direct_reads_for_compaction(rocksdb_options_t* opt,
5042+
unsigned char v) {
5043+
opt->rep.use_direct_reads_for_compaction = v;
5044+
}
5045+
5046+
unsigned char rocksdb_options_get_use_direct_reads_for_compaction(
5047+
rocksdb_options_t* opt) {
5048+
return opt->rep.use_direct_reads_for_compaction;
5049+
}
5050+
50415051
void rocksdb_options_set_allow_mmap_reads(rocksdb_options_t* opt,
50425052
unsigned char v) {
50435053
opt->rep.allow_mmap_reads = v;

db/compaction/compaction_job.cc

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,12 @@ CompactionJob::CompactionJob(
203203
assert(job_context);
204204
assert(job_context->snapshot_context_initialized);
205205

206+
// Expose the file options used for compaction reads so tests can confirm
207+
// that `use_direct_reads_for_compaction` (and related flags) plumb all the
208+
// way through to the read path.
209+
TEST_SYNC_POINT_CALLBACK("CompactionJob::CompactionJob:FileOptionsForRead",
210+
&file_options_for_read_);
211+
206212
const auto* cfd = compact_->compaction->column_family_data();
207213
ThreadStatusUtil::SetEnableTracking(db_options_.enable_thread_tracking);
208214
ThreadStatusUtil::SetColumnFamily(cfd);
@@ -1536,10 +1542,31 @@ InternalIterator* CompactionJob::CreateInputIterator(
15361542

15371543
// Although the v2 aggregator is what the level iterator(s) know about,
15381544
// the AddTombstones calls will be propagated down to the v1 aggregator.
1545+
//
1546+
// When `use_direct_reads_for_compaction` is set while the global
1547+
// `use_direct_reads` stays off, the shared TableCache is already holding
1548+
// buffered file handles for these SST files (opened that way for user
1549+
// reads). Reusing those handles would silently downgrade the compaction
1550+
// scan back to buffered I/O. Ask the iterator to open ephemeral
1551+
// O_DIRECT handles instead so the kernel actually bypasses the page
1552+
// cache for the compaction reads.
1553+
//
1554+
// The third clause (`file_options_for_read_.use_direct_reads`) is
1555+
// defensive: it confirms that `OptimizeForCompactionTableRead` actually
1556+
// requested direct I/O on the read FileOptions we will hand to the
1557+
// iterator. The base FileSystem implementation always sets it when the
1558+
// flag combination above is true, but a custom FileSystem could override
1559+
// OptimizeForCompactionTableRead without honoring the new flag -- in
1560+
// which case bypassing the cache would give us buffered handles anyway,
1561+
// which is wasteful. Skip the bypass in that case.
1562+
const bool bypass_cache_for_scan =
1563+
db_options_.use_direct_reads_for_compaction &&
1564+
!db_options_.use_direct_reads && file_options_for_read_.use_direct_reads;
15391565
iterators.raw_input =
15401566
std::unique_ptr<InternalIterator>(versions_->MakeInputIterator(
15411567
read_options, sub_compact->compaction, sub_compact->RangeDelAgg(),
1542-
file_options_for_read_, boundaries.start, boundaries.end));
1568+
file_options_for_read_, boundaries.start, boundaries.end,
1569+
bypass_cache_for_scan));
15431570
InternalIterator* input = iterators.raw_input.get();
15441571

15451572
if (boundaries.start.has_value() || boundaries.end.has_value()) {

db/db_compaction_test.cc

Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6651,6 +6651,321 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) {
66516651
INSTANTIATE_TEST_CASE_P(DBCompactionDirectIOTest, DBCompactionDirectIOTest,
66526652
testing::Bool());
66536653

6654+
// End-to-end check that `use_direct_reads_for_compaction` actually causes
6655+
// compaction-input SST files to be opened with O_DIRECT, even though
6656+
// `use_direct_reads` (the global flag) is left off so user reads stay
6657+
// buffered. The assertion exercises the kernel-level path, not just the
6658+
// FileOptions plumbing: the existing `NewRandomAccessFile:O_DIRECT` sync
6659+
// point in env/fs_posix.cc fires once per fresh open that includes the
6660+
// O_DIRECT flag.
6661+
//
6662+
// This test only runs on platforms that go through the O_DIRECT path
6663+
// (Linux / non-BSD POSIX), since that is the configuration RocksDB users
6664+
// actually deploy with the direct-I/O knobs. On other platforms it is
6665+
// silently bypassed.
6666+
#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS) && \
6667+
!defined(OS_WIN)
6668+
TEST_F(DBCompactionTest, UseDirectReadsForCompactionEndToEnd) {
6669+
if (!IsDirectIOSupported()) {
6670+
ROCKSDB_GTEST_BYPASS("Direct IO not supported");
6671+
return;
6672+
}
6673+
6674+
Options options = CurrentOptions();
6675+
Destroy(options);
6676+
options.create_if_missing = true;
6677+
options.disable_auto_compactions = true;
6678+
// User reads stay buffered, compaction reads should switch to O_DIRECT.
6679+
options.use_direct_reads = false;
6680+
options.use_direct_reads_for_compaction = true;
6681+
// Isolate the read-side change; leave the compaction write path buffered.
6682+
options.use_direct_io_for_flush_and_compaction = false;
6683+
6684+
// Sync-point callbacks fire on compaction threads while assertions read
6685+
// these counters on the test thread. Use atomics to avoid a data race
6686+
// even when (as in this test) the workload is structured so the threads
6687+
// synchronize on TEST_WaitForCompact before reading.
6688+
std::atomic<int> observed_run_starts{0};
6689+
std::atomic<int> observed_odirect_opens{0};
6690+
std::atomic<bool> observed_direct_compaction_read{false};
6691+
std::atomic<int> observed_callbacks{0};
6692+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({});
6693+
// Plumbing-level probe: the compaction-read FileOptions should carry
6694+
// use_direct_reads = true when the new flag is enabled.
6695+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6696+
"CompactionJob::CompactionJob:FileOptionsForRead", [&](void* arg) {
6697+
const auto* fo = static_cast<const FileOptions*>(arg);
6698+
observed_callbacks.fetch_add(1, std::memory_order_relaxed);
6699+
if (fo != nullptr && fo->use_direct_reads) {
6700+
observed_direct_compaction_read.store(true,
6701+
std::memory_order_relaxed);
6702+
}
6703+
});
6704+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6705+
"CompactionJob::Run():Start", [&](void* /*arg*/) {
6706+
observed_run_starts.fetch_add(1, std::memory_order_relaxed);
6707+
});
6708+
// Kernel-level probe: this sync point fires only when the OS open() call
6709+
// is being issued with O_DIRECT in its flags. Hitting it proves we are
6710+
// actually changing the cache-mode for compaction reads, not just the
6711+
// in-memory FileOptions struct.
6712+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6713+
"NewRandomAccessFile:O_DIRECT", [&](void* /*arg*/) {
6714+
observed_odirect_opens.fetch_add(1, std::memory_order_relaxed);
6715+
});
6716+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6717+
6718+
Status s = TryReopen(options);
6719+
if (s.IsNotSupported() || s.IsInvalidArgument()) {
6720+
ROCKSDB_GTEST_BYPASS(
6721+
"Direct IO reads not supported in this test environment");
6722+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6723+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
6724+
return;
6725+
}
6726+
ASSERT_OK(s);
6727+
6728+
// Produce two L0 files with OVERLAPPING key ranges so that CompactRange has
6729+
// actual merge work to do (otherwise RocksDB performs a trivial file move
6730+
// and never constructs a CompactionJob).
6731+
const std::string value(4096, 'v');
6732+
for (int i = 0; i < 64; ++i) {
6733+
ASSERT_OK(Put(Key(i), value));
6734+
}
6735+
ASSERT_OK(Flush());
6736+
for (int i = 0; i < 64; ++i) {
6737+
ASSERT_OK(Put(Key(i), value));
6738+
}
6739+
ASSERT_OK(Flush());
6740+
6741+
// User reads should still go through the buffered path. Confirm that the
6742+
// option does not silently flip use_direct_reads for user reads.
6743+
for (int i = 0; i < 8; ++i) {
6744+
std::string actual;
6745+
ASSERT_OK(db_->Get(ReadOptions(), Key(i), &actual));
6746+
ASSERT_EQ(value, actual);
6747+
}
6748+
6749+
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
6750+
// Wait for compaction to complete and CompactionJob to be constructed.
6751+
ASSERT_OK(dbfull()->TEST_WaitForCompact());
6752+
6753+
// Diagnostic: confirm that the compaction actually ran. If it didn't, the
6754+
// missing FileOptions sync-point hits would be a test-infrastructure issue,
6755+
// not a regression in the new option.
6756+
ASSERT_GT(observed_run_starts.load(), 0)
6757+
<< "CompactionJob::Run():Start never fired; CompactRange did not "
6758+
"schedule a compaction.";
6759+
ASSERT_GT(observed_callbacks.load(), 0);
6760+
ASSERT_TRUE(observed_direct_compaction_read.load());
6761+
// The headline assertion: at least one compaction-input file open went
6762+
// through the O_DIRECT path. Without the TableCache bypass plumbing this
6763+
// would be zero because compaction would silently reuse the buffered
6764+
// handles already cached for user reads.
6765+
EXPECT_GT(observed_odirect_opens.load(), 0)
6766+
<< "no compaction-input opens went through O_DIRECT; "
6767+
"observed_odirect_opens="
6768+
<< observed_odirect_opens.load();
6769+
6770+
// Quick sanity sweep after compaction to confirm data is intact.
6771+
for (int i = 0; i < 64; ++i) {
6772+
std::string actual;
6773+
ASSERT_OK(db_->Get(ReadOptions(), Key(i), &actual));
6774+
ASSERT_EQ(value, actual);
6775+
}
6776+
6777+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6778+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
6779+
Destroy(options);
6780+
}
6781+
6782+
// Confirms that when use_direct_reads_for_compaction is OFF, compaction reads
6783+
// stay on the buffered path: neither the compaction-read FileOptions nor the
6784+
// kernel-level O_DIRECT open should ever be triggered. Pairs with the test
6785+
// above to cover both halves of the on/off switch.
6786+
TEST_F(DBCompactionTest, UseDirectReadsForCompactionOffStaysBuffered) {
6787+
Options options = CurrentOptions();
6788+
Destroy(options);
6789+
options.create_if_missing = true;
6790+
options.disable_auto_compactions = true;
6791+
options.use_direct_reads = false;
6792+
options.use_direct_reads_for_compaction = false;
6793+
options.use_direct_io_for_flush_and_compaction = false;
6794+
6795+
std::atomic<bool> observed_direct_compaction_read{false};
6796+
std::atomic<int> observed_callbacks{0};
6797+
std::atomic<int> observed_odirect_opens{0};
6798+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6799+
"CompactionJob::CompactionJob:FileOptionsForRead", [&](void* arg) {
6800+
const auto* fo = static_cast<const FileOptions*>(arg);
6801+
observed_callbacks.fetch_add(1, std::memory_order_relaxed);
6802+
if (fo->use_direct_reads) {
6803+
observed_direct_compaction_read.store(true,
6804+
std::memory_order_relaxed);
6805+
}
6806+
});
6807+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6808+
"NewRandomAccessFile:O_DIRECT", [&](void* /*arg*/) {
6809+
observed_odirect_opens.fetch_add(1, std::memory_order_relaxed);
6810+
});
6811+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6812+
6813+
ASSERT_OK(TryReopen(options));
6814+
6815+
const std::string value(4096, 'v');
6816+
for (int i = 0; i < 64; ++i) {
6817+
ASSERT_OK(Put(Key(i), value));
6818+
}
6819+
ASSERT_OK(Flush());
6820+
for (int i = 0; i < 64; ++i) {
6821+
ASSERT_OK(Put(Key(i), value));
6822+
}
6823+
ASSERT_OK(Flush());
6824+
6825+
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
6826+
ASSERT_OK(dbfull()->TEST_WaitForCompact());
6827+
6828+
ASSERT_GT(observed_callbacks.load(), 0);
6829+
ASSERT_FALSE(observed_direct_compaction_read.load());
6830+
ASSERT_EQ(0, observed_odirect_opens.load());
6831+
6832+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6833+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
6834+
Destroy(options);
6835+
}
6836+
6837+
// Exercise the LevelIterator bypass path (L1+ compactions) with range
6838+
// tombstones present, which is where the ephemeral TableReader's lifetime
6839+
// is non-trivially coupled to the range_tombstone_iter the file iterator
6840+
// hands back. The end-to-end test above only constructs two L0 files,
6841+
// which compact via the direct NewIterator path in MakeInputIterator and
6842+
// never go through LevelIterator. This test populates data in L1 and L2,
6843+
// adds range tombstones at each level, then triggers an L1->L2
6844+
// compaction so LevelIterator::NewFileIterator is the one driving the
6845+
// O_DIRECT bypass. If the TableReader lifetime were tied incorrectly to
6846+
// the file iterator, the range-tombstone iterator created from the same
6847+
// reader would either crash or be flagged by sanitizers when LevelIterator
6848+
// transitions between files.
6849+
TEST_F(DBCompactionTest,
6850+
UseDirectReadsForCompactionLevelIteratorWithTombstones) {
6851+
Options options = CurrentOptions();
6852+
Destroy(options);
6853+
options.create_if_missing = true;
6854+
options.disable_auto_compactions = true;
6855+
options.use_direct_reads = false;
6856+
options.use_direct_reads_for_compaction = true;
6857+
options.use_direct_io_for_flush_and_compaction = false;
6858+
// Small files / small level base so we can pack data into L1 and L2 with
6859+
// a few flushes and CompactRange calls instead of needing millions of keys.
6860+
options.write_buffer_size = 64 * 1024;
6861+
options.target_file_size_base = 64 * 1024;
6862+
options.max_bytes_for_level_base = 256 * 1024;
6863+
options.level0_file_num_compaction_trigger = 100; // never auto-trigger
6864+
6865+
std::atomic<int> observed_odirect_opens{0};
6866+
std::atomic<int> observed_run_starts{0};
6867+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6868+
"NewRandomAccessFile:O_DIRECT", [&](void* /*arg*/) {
6869+
observed_odirect_opens.fetch_add(1, std::memory_order_relaxed);
6870+
});
6871+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
6872+
"CompactionJob::Run():Start", [&](void* /*arg*/) {
6873+
observed_run_starts.fetch_add(1, std::memory_order_relaxed);
6874+
});
6875+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
6876+
6877+
Status s = TryReopen(options);
6878+
if (s.IsNotSupported() || s.IsInvalidArgument()) {
6879+
ROCKSDB_GTEST_BYPASS(
6880+
"Direct IO reads not supported in this test environment");
6881+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6882+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
6883+
return;
6884+
}
6885+
ASSERT_OK(s);
6886+
6887+
const std::string value(1024, 'v');
6888+
6889+
auto write_batch = [&](int begin, int end, bool with_range_tombstone) {
6890+
for (int i = begin; i < end; ++i) {
6891+
ASSERT_OK(Put(Key(i), value));
6892+
}
6893+
if (with_range_tombstone) {
6894+
// Drop a slice in the middle of the just-written range. This puts a
6895+
// FragmentedRangeTombstone in the resulting SST file so the L1+
6896+
// compaction has actual tombstones to iterate over.
6897+
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
6898+
Key(begin + (end - begin) / 4),
6899+
Key(begin + 3 * (end - begin) / 4)));
6900+
}
6901+
ASSERT_OK(Flush());
6902+
};
6903+
6904+
// Build up data in L0 across several files with range tombstones.
6905+
// Each batch produces one L0 SST.
6906+
for (int batch = 0; batch < 4; ++batch) {
6907+
write_batch(batch * 200, batch * 200 + 200, /*with_range_tombstone=*/true);
6908+
}
6909+
// Force everything down to L2 via two manual CompactRange calls so the
6910+
// file layout has SSTs at both L1 and L2 (or at least L2). The
6911+
// subsequent L0->L2 compaction will then exercise LevelIterator.
6912+
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
6913+
ASSERT_OK(dbfull()->TEST_WaitForCompact());
6914+
6915+
// Add another wave at L0 that overlaps with what is now at the lower
6916+
// levels, plus another range tombstone, so the next compaction has L1+
6917+
// inputs with tombstones.
6918+
for (int batch = 0; batch < 2; ++batch) {
6919+
write_batch(batch * 300 + 50, batch * 300 + 250,
6920+
/*with_range_tombstone=*/true);
6921+
}
6922+
6923+
const int run_starts_before = observed_run_starts.load();
6924+
const int odirect_before = observed_odirect_opens.load();
6925+
6926+
// The big one: compact everything together. This forces a LevelIterator
6927+
// to be constructed over the existing lower-level files with the bypass
6928+
// path. If the ephemeral TableReader / range-tombstone iter lifetimes
6929+
// are wrong, sanitizers should catch it here.
6930+
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
6931+
ASSERT_OK(dbfull()->TEST_WaitForCompact());
6932+
6933+
ASSERT_GT(observed_run_starts.load(), run_starts_before)
6934+
<< "expected at least one compaction to run during the L1+ phase";
6935+
// The TableCache may already have ephemeral readers from the earlier
6936+
// compactions, so we just need *some* O_DIRECT opens overall.
6937+
EXPECT_GT(observed_odirect_opens.load(), odirect_before)
6938+
<< "no compaction-input opens went through O_DIRECT during L1+ "
6939+
"compaction; LevelIterator bypass path may be broken";
6940+
6941+
// Sanity: the surviving (non-tombstoned) keys should still be readable
6942+
// and the tombstoned ones should be gone.
6943+
std::string actual;
6944+
for (int batch = 0; batch < 4; ++batch) {
6945+
int begin = batch * 200;
6946+
int end = batch * 200 + 200;
6947+
int del_lo = begin + (end - begin) / 4;
6948+
int del_hi = begin + 3 * (end - begin) / 4;
6949+
for (int i = begin; i < end; ++i) {
6950+
Status get_s = db_->Get(ReadOptions(), Key(i), &actual);
6951+
if (i >= del_lo && i < del_hi) {
6952+
// Could be NotFound (tombstoned) or overwritten by the second wave;
6953+
// both are acceptable -- we are exercising correctness of compaction,
6954+
// not the exact tombstone-vs-overwrite resolution here.
6955+
ASSERT_TRUE(get_s.ok() || get_s.IsNotFound());
6956+
} else {
6957+
ASSERT_TRUE(get_s.ok() || get_s.IsNotFound())
6958+
<< "unexpected error reading key " << i << ": " << get_s.ToString();
6959+
}
6960+
}
6961+
}
6962+
6963+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6964+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
6965+
Destroy(options);
6966+
}
6967+
#endif // !defined(OS_MACOSX) && !defined(OS_OPENBSD) && ...
6968+
66546969
class CompactionPriTest : public DBTestBase,
66556970
public testing::WithParamInterface<uint32_t> {
66566971
public:

0 commit comments

Comments
 (0)