Skip to content

Commit d1afa9b

Browse files
authored
feat: support cache mode for prefetch (#113)
1 parent e8b2dcb commit d1afa9b

12 files changed

Lines changed: 214 additions & 114 deletions

include/paimon/read_context.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ class PAIMON_EXPORT ReadContext {
5353
const std::shared_ptr<Executor>& executor,
5454
const std::shared_ptr<FileSystem>& specific_file_system,
5555
const std::map<std::string, std::string>& fs_scheme_to_identifier_map,
56-
const std::map<std::string, std::string>& options, bool enable_prefetch_cache,
57-
const CacheConfig& cache_config);
56+
const std::map<std::string, std::string>& options,
57+
PrefetchCacheMode prefetch_cache_mode, const CacheConfig& cache_config);
5858
~ReadContext();
5959

6060
const std::string& GetPath() const {
@@ -116,8 +116,8 @@ class PAIMON_EXPORT ReadContext {
116116
return specific_file_system_;
117117
}
118118

119-
bool EnablePrefetchCache() const {
120-
return enable_prefetch_cache_;
119+
PrefetchCacheMode GetPrefetchCacheMode() const {
120+
return prefetch_cache_mode_;
121121
}
122122

123123
const CacheConfig& GetCacheConfig() const {
@@ -142,7 +142,7 @@ class PAIMON_EXPORT ReadContext {
142142
std::shared_ptr<FileSystem> specific_file_system_;
143143
std::map<std::string, std::string> fs_scheme_to_identifier_map_;
144144
std::map<std::string, std::string> options_;
145-
bool enable_prefetch_cache_;
145+
PrefetchCacheMode prefetch_cache_mode_;
146146
CacheConfig cache_config_;
147147
};
148148

@@ -226,13 +226,13 @@ class PAIMON_EXPORT ReadContextBuilder {
226226
/// @return Reference to this builder for method chaining.
227227
ReadContextBuilder& EnablePrefetch(bool enabled);
228228

229-
/// Enable or disable prefetch cache for read operations.
229+
/// Set prefetch cache mode for read operations.
230230
///
231-
/// When enabled, a prefetch cache is used to prebuffer data ranges before they are needed,
231+
/// A prefetch cache is used to prebuffer data ranges before they are needed,
232232
/// which can improve read performance by reducing redundant I/O operations.
233-
/// @param enabled Whether to enable prefetch cache (default: true)
233+
/// @param mode (default: PrefetchCacheMode::ALWAYS)
234234
/// @return Reference to this builder for method chaining.
235-
ReadContextBuilder& EnablePrefetchCache(bool enabled);
235+
ReadContextBuilder& SetPrefetchCacheMode(PrefetchCacheMode mode);
236236

237237
/// Set the cache configuration for prefetch read operations.
238238
///

include/paimon/utils/read_ahead_cache.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,24 @@
3434

3535
namespace paimon {
3636

37+
/// PrefetchCacheMode
38+
/// Cache prefetch switch modes.
39+
/// Controls whether to enable cache prefetching under different circumstances, such as queries with
40+
/// predicates or bitmap indexes.
41+
///
42+
/// - ALWAYS: Enable cache in all scenarios.
43+
/// - EXCLUDE_PREDICATE: Disable cache when query has predicates.
44+
/// - EXCLUDE_BITMAP: Disable cache when using bitmap index.
45+
/// - EXCLUDE_BITMAP_OR_PREDICATE: Disable cache if query has predicates or bitmap index.
46+
/// - NEVER: Always disable cache.
47+
enum class PAIMON_EXPORT PrefetchCacheMode {
48+
ALWAYS = 1,
49+
EXCLUDE_PREDICATE = 2,
50+
EXCLUDE_BITMAP = 3,
51+
EXCLUDE_BITMAP_OR_PREDICATE = 4,
52+
NEVER = 5
53+
};
54+
3755
/// Configuration parameters for the read-ahead cache behavior.
3856
///
3957
/// This struct controls various limits and prefetching strategies used by

src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader_test.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,14 @@ class ApplyBitmapIndexBatchReaderTest : public ::testing::Test,
8888
bool enable_prefetch = GetParam();
8989
if (enable_prefetch) {
9090
MockFormatReaderBuilder reader_builder(data, target_type_, batch_size);
91-
ASSERT_OK_AND_ASSIGN(file_batch_reader,
92-
PrefetchFileBatchReaderImpl::Create(
93-
/*data_file_path=*/"DUMMY", &reader_builder, fs_,
94-
prefetch_batch_count, batch_size, prefetch_batch_count * 2,
95-
/*enable_adaptive_prefetch_strategy=*/false, executor_,
96-
/*initialize_read_ranges=*/true,
97-
/*enable_prefetch_cache=*/true, CacheConfig(), pool_));
91+
ASSERT_OK_AND_ASSIGN(
92+
file_batch_reader,
93+
PrefetchFileBatchReaderImpl::Create(
94+
/*data_file_path=*/"DUMMY", &reader_builder, fs_, prefetch_batch_count,
95+
batch_size, prefetch_batch_count * 2,
96+
/*enable_adaptive_prefetch_strategy=*/false, executor_,
97+
/*initialize_read_ranges=*/true,
98+
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS, CacheConfig(), pool_));
9899
} else {
99100
file_batch_reader =
100101
std::make_unique<MockFileBatchReader>(data, target_type_, batch_size);

src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> PrefetchFileBatchReaderImpl
4545
const std::shared_ptr<FileSystem>& fs, uint32_t prefetch_max_parallel_num, int32_t batch_size,
4646
uint32_t prefetch_batch_count, bool enable_adaptive_prefetch_strategy,
4747
const std::shared_ptr<Executor>& executor, bool initialize_read_ranges,
48-
bool enable_prefetch_cache, const CacheConfig& cache_config,
48+
PrefetchCacheMode prefetch_cache_mode, const CacheConfig& cache_config,
4949
const std::shared_ptr<MemoryPool>& pool) {
5050
if (prefetch_max_parallel_num == 0) {
5151
return Status::Invalid("prefetch max parallel num should be greater than 0.");
@@ -67,7 +67,7 @@ Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> PrefetchFileBatchReaderImpl
6767
}
6868

6969
std::shared_ptr<ReadAheadCache> cache;
70-
if (enable_prefetch_cache) {
70+
if (prefetch_cache_mode != PrefetchCacheMode::NEVER) {
7171
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> input_stream, fs->Open(data_file_path));
7272
cache = std::make_shared<ReadAheadCache>(input_stream, cache_config, pool);
7373
}
@@ -102,9 +102,9 @@ Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> PrefetchFileBatchReaderImpl
102102
}
103103
uint32_t prefetch_queue_capacity = prefetch_batch_count / readers.size();
104104

105-
auto reader = std::unique_ptr<PrefetchFileBatchReaderImpl>(
106-
new PrefetchFileBatchReaderImpl(readers, batch_size, prefetch_queue_capacity,
107-
enable_adaptive_prefetch_strategy, executor, cache));
105+
auto reader = std::unique_ptr<PrefetchFileBatchReaderImpl>(new PrefetchFileBatchReaderImpl(
106+
readers, batch_size, prefetch_queue_capacity, enable_adaptive_prefetch_strategy, executor,
107+
cache, prefetch_cache_mode));
108108
if (initialize_read_ranges) {
109109
// normally initialize read ranges should be false, as set read schema will refresh read
110110
// ranges, and set read schema will always be called before read.
@@ -116,11 +116,13 @@ Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> PrefetchFileBatchReaderImpl
116116
PrefetchFileBatchReaderImpl::PrefetchFileBatchReaderImpl(
117117
const std::vector<std::shared_ptr<PrefetchFileBatchReader>>& readers, int32_t batch_size,
118118
uint32_t prefetch_queue_capacity, bool enable_adaptive_prefetch_strategy,
119-
const std::shared_ptr<Executor>& executor, const std::shared_ptr<ReadAheadCache>& cache)
119+
const std::shared_ptr<Executor>& executor, const std::shared_ptr<ReadAheadCache>& cache,
120+
PrefetchCacheMode cache_mode)
120121
: readers_(std::move(readers)),
121122
batch_size_(batch_size),
122123
executor_(executor),
123124
cache_(cache),
125+
cache_mode_(cache_mode),
124126
prefetch_queue_capacity_(prefetch_queue_capacity),
125127
enable_adaptive_prefetch_strategy_(enable_adaptive_prefetch_strategy) {
126128
for (size_t i = 0; i < readers_.size(); i++) {
@@ -146,6 +148,7 @@ Status PrefetchFileBatchReaderImpl::SetReadSchema(
146148
PAIMON_RETURN_NOT_OK(reader->SetReadSchema(c_schema.get(), predicate, selection_bitmap));
147149
}
148150
selection_bitmap_ = selection_bitmap;
151+
predicate_ = predicate;
149152
return RefreshReadRanges();
150153
}
151154

@@ -275,10 +278,28 @@ Status PrefetchFileBatchReaderImpl::CleanUp() {
275278
return Status::OK();
276279
}
277280

281+
bool PrefetchFileBatchReaderImpl::NeedInitCache() const {
282+
switch (cache_mode_) {
283+
case PrefetchCacheMode::NEVER:
284+
return false;
285+
case PrefetchCacheMode::EXCLUDE_PREDICATE:
286+
return predicate_ == nullptr;
287+
case PrefetchCacheMode::EXCLUDE_BITMAP:
288+
return selection_bitmap_ == std::nullopt;
289+
case PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE:
290+
return predicate_ == nullptr && selection_bitmap_ == std::nullopt;
291+
case PrefetchCacheMode::ALWAYS:
292+
return true;
293+
default:
294+
assert(false);
295+
return true;
296+
}
297+
}
298+
278299
void PrefetchFileBatchReaderImpl::Workloop() {
279300
std::vector<std::future<void>> futures;
280301
futures.resize(readers_.size());
281-
if (cache_) {
302+
if (cache_ && NeedInitCache()) {
282303
auto read_ranges = readers_[0]->PreBufferRange();
283304
if (read_ranges.ok()) {
284305
std::vector<ByteRange> ranges;
@@ -288,11 +309,9 @@ void PrefetchFileBatchReaderImpl::Workloop() {
288309
auto s = cache_->Init(std::move(ranges));
289310
if (!s.ok()) {
290311
SetReadStatus(s);
291-
return;
292312
}
293313
} else {
294314
SetReadStatus(read_ranges.status());
295-
return;
296315
}
297316
}
298317

src/paimon/common/reader/prefetch_file_batch_reader_impl.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
5858
const std::shared_ptr<FileSystem>& fs, uint32_t prefetch_max_parallel_num,
5959
int32_t batch_size, uint32_t prefetch_batch_count, bool enable_adaptive_prefetch_strategy,
6060
const std::shared_ptr<Executor>& executor, bool initialize_read_ranges,
61-
bool enable_prefetch_cache, const CacheConfig& cache_config,
61+
PrefetchCacheMode prefetch_cache_mode, const CacheConfig& cache_config,
6262
const std::shared_ptr<MemoryPool>& pool);
6363

6464
~PrefetchFileBatchReaderImpl() override;
@@ -111,7 +111,8 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
111111
PrefetchFileBatchReaderImpl(
112112
const std::vector<std::shared_ptr<PrefetchFileBatchReader>>& readers, int32_t batch_size,
113113
uint32_t prefetch_queue_capacity, bool enable_adaptive_prefetch_strategy,
114-
const std::shared_ptr<Executor>& executor, const std::shared_ptr<ReadAheadCache>& cache);
114+
const std::shared_ptr<Executor>& executor, const std::shared_ptr<ReadAheadCache>& cache,
115+
PrefetchCacheMode cache_mode);
115116

116117
Status CleanUp();
117118
void Workloop();
@@ -134,6 +135,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
134135
const std::pair<uint64_t, uint64_t>& read_range) const;
135136
Status HandleReadResult(size_t reader_idx, const std::pair<uint64_t, uint64_t>& read_range,
136137
FileBatchReader::ReadBatchWithBitmap&& read_batch_with_bitmap);
138+
bool NeedInitCache() const;
137139

138140
private:
139141
std::vector<std::shared_ptr<PrefetchFileBatchReader>> readers_;
@@ -143,6 +145,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
143145
std::vector<std::unique_ptr<std::atomic<uint64_t>>> seek_cnt_;
144146
const int32_t batch_size_;
145147
std::optional<RoaringBitmap32> selection_bitmap_;
148+
std::shared_ptr<Predicate> predicate_;
146149
std::deque<std::pair<uint64_t, uint64_t>> read_ranges_;
147150
std::vector<std::vector<std::pair<uint64_t, uint64_t>>> read_ranges_in_group_;
148151
std::vector<std::unique_ptr<ThreadsafeQueue<PrefetchBatch>>> prefetch_queues_;
@@ -151,6 +154,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
151154
std::condition_variable cv_;
152155
std::shared_ptr<Executor> executor_;
153156
std::shared_ptr<ReadAheadCache> cache_;
157+
PrefetchCacheMode cache_mode_;
154158

155159
mutable std::shared_mutex rw_mutex_;
156160
std::unique_ptr<std::thread> background_thread_;

0 commit comments

Comments
 (0)