-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Expand file tree
/
Copy pathinverted_index_cache.h
More file actions
286 lines (237 loc) · 10.6 KB
/
inverted_index_cache.h
File metadata and controls
286 lines (237 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <butil/macros.h>
#include <glog/logging.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
#include <memory>
#include <roaring/roaring.hh>
#include <string>
#include "common/config.h"
#include "common/status.h"
#include "cpp/lru_cache.h"
#include "io/fs/file_system.h"
#include "io/fs/path.h"
#include "runtime/exec_env.h"
#include "runtime/memory/lru_cache_policy.h"
#include "runtime/memory/mem_tracker.h"
#include "storage/index/inverted/inverted_index_searcher.h"
#include "util/slice.h"
#include "util/time.h"
namespace doris {
namespace segment_v2 {
class InvertedIndexCacheHandle;
class InvertedIndexSearcherCache {
public:
// The cache key of index_searcher lru cache
struct CacheKey {
CacheKey(std::string index_file_path) : index_file_path(std::move(index_file_path)) {}
std::string index_file_path;
};
// The cache value of index_searcher lru cache.
// Holding an opened index_searcher.
class CacheValue : public LRUCacheValueBase {
public:
IndexSearcherPtr index_searcher;
size_t size = 0;
int64_t last_visit_time;
CacheValue() = default;
explicit CacheValue(IndexSearcherPtr searcher, size_t mem_size, int64_t visit_time)
: index_searcher(std::move(searcher)) {
size = mem_size;
last_visit_time = visit_time;
}
};
// Create global instance of this class.
// "capacity" is the capacity of lru cache.
static InvertedIndexSearcherCache* create_global_instance(size_t capacity,
uint32_t num_shards = 16);
// Return global instance.
// Client should call create_global_cache before.
static InvertedIndexSearcherCache* instance() {
return ExecEnv::GetInstance()->get_inverted_index_searcher_cache();
}
InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards);
void insert(const InvertedIndexSearcherCache::CacheKey& cache_key, CacheValue* cache_value);
void insert(const InvertedIndexSearcherCache::CacheKey& cache_key, CacheValue* cache_value,
InvertedIndexCacheHandle* handle);
// Lookup the given index_searcher in the cache.
// If the index_searcher is found, the cache entry will be written into handle.
// Return true if entry is found, otherwise return false.
bool lookup(const InvertedIndexSearcherCache::CacheKey& key, InvertedIndexCacheHandle* handle);
// function `erase` called after compaction remove segment
Status erase(const std::string& index_file_path);
void release(Cache::Handle* handle) { _policy->release(handle); }
int64_t mem_consumption();
private:
InvertedIndexSearcherCache() = default;
class InvertedIndexSearcherCachePolicy : public LRUCachePolicy {
public:
InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards,
uint32_t element_count_capacity)
: LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity,
LRUCacheType::SIZE,
config::inverted_index_cache_stale_sweep_time_sec, num_shards,
element_count_capacity, /*enable_prune*/ true,
/*is lru k*/ false) {}
InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards,
uint32_t element_count_capacity,
CacheValueTimeExtractor cache_value_time_extractor,
bool cache_value_check_timestamp)
: LRUCachePolicy(
CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity,
LRUCacheType::SIZE, config::inverted_index_cache_stale_sweep_time_sec,
num_shards, element_count_capacity, cache_value_time_extractor,
cache_value_check_timestamp, /*enable_prune*/ true, /*is lru k*/ false) {}
};
// Insert a cache entry by key.
// And the cache entry will be returned in handle.
// This function is thread-safe.
Cache::Handle* _insert(const InvertedIndexSearcherCache::CacheKey& key, CacheValue* value);
std::unique_ptr<InvertedIndexSearcherCachePolicy> _policy;
};
using IndexCacheValuePtr = std::unique_ptr<InvertedIndexSearcherCache::CacheValue>;
// A handle for a index_searcher from index_searcher lru cache.
// The handle can ensure that the index_searcher is valid
// and will not be closed while the holder of the handle is accessing the index_searcher.
// The handle will automatically release the cache entry when it is destroyed.
// So the caller need to make sure the handle is valid in lifecycle.
class InvertedIndexCacheHandle {
public:
InvertedIndexCacheHandle() = default;
InvertedIndexCacheHandle(LRUCachePolicy* cache, Cache::Handle* handle)
: _cache(cache), _handle(handle) {}
~InvertedIndexCacheHandle() {
if (_handle != nullptr) {
CHECK(_cache != nullptr);
// only after get_index_searcher call this destructor will
// add `config::index_cache_entry_stay_time_after_lookup_s` on last_visit_time,
// this is to extend the retention time of the entries hit by lookup.
((InvertedIndexSearcherCache::CacheValue*)_cache->value(_handle))->last_visit_time =
UnixMillis() + config::index_cache_entry_stay_time_after_lookup_s * 1000;
_cache->release(_handle);
}
}
InvertedIndexCacheHandle(InvertedIndexCacheHandle&& other) noexcept {
std::swap(_cache, other._cache);
std::swap(_handle, other._handle);
}
InvertedIndexCacheHandle& operator=(InvertedIndexCacheHandle&& other) noexcept {
std::swap(_cache, other._cache);
std::swap(_handle, other._handle);
return *this;
}
IndexSearcherPtr get_index_searcher() {
return ((InvertedIndexSearcherCache::CacheValue*)_cache->value(_handle))->index_searcher;
}
InvertedIndexSearcherCache::CacheValue* get_index_cache_value() {
return ((InvertedIndexSearcherCache::CacheValue*)_cache->value(_handle));
}
private:
LRUCachePolicy* _cache = nullptr;
Cache::Handle* _handle = nullptr;
// Don't allow copy and assign
DISALLOW_COPY_AND_ASSIGN(InvertedIndexCacheHandle);
};
class InvertedIndexQueryCacheHandle;
class InvertedIndexQueryCache : public LRUCachePolicy {
public:
using LRUCachePolicy::insert;
// cache key
struct CacheKey {
io::Path index_path; // index file path
std::string column_name; // column name
InvertedIndexQueryType query_type; // query type
std::string value; // query value
// Encode to a flat binary which can be used as LRUCache's key
std::string encode() const {
std::string key_buf(index_path.string());
key_buf.append("/");
key_buf.append(column_name);
key_buf.append("/");
auto query_type_str = query_type_to_string(query_type);
if (query_type_str.empty()) {
return "";
}
key_buf.append(query_type_str);
key_buf.append("/");
key_buf.append(value);
return key_buf;
}
};
class CacheValue : public LRUCacheValueBase {
public:
std::shared_ptr<roaring::Roaring> bitmap;
};
// Create global instance of this class
static InvertedIndexQueryCache* create_global_cache(size_t capacity, uint32_t num_shards = 16) {
auto* res = new InvertedIndexQueryCache(capacity, num_shards);
return res;
}
// Return global instance.
// Client should call create_global_cache before.
static InvertedIndexQueryCache* instance() {
return ExecEnv::GetInstance()->get_inverted_index_query_cache();
}
InvertedIndexQueryCache() = delete;
InvertedIndexQueryCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE, capacity,
LRUCacheType::SIZE, config::inverted_index_cache_stale_sweep_time_sec,
num_shards,
/*element_count_capacity*/ 0, /*enable_prune*/ true,
/*is_lru_k*/ true) {}
bool lookup(const CacheKey& key, InvertedIndexQueryCacheHandle* handle);
void insert(const CacheKey& key, std::shared_ptr<roaring::Roaring> bitmap,
InvertedIndexQueryCacheHandle* handle);
};
class InvertedIndexQueryCacheHandle {
public:
InvertedIndexQueryCacheHandle() = default;
InvertedIndexQueryCacheHandle(LRUCachePolicy* cache, Cache::Handle* handle)
: _cache(cache), _handle(handle) {}
~InvertedIndexQueryCacheHandle() {
if (_handle != nullptr) {
_cache->release(_handle);
}
}
InvertedIndexQueryCacheHandle(InvertedIndexQueryCacheHandle&& other) noexcept {
// we can use std::exchange if we switch c++14 on
std::swap(_cache, other._cache);
std::swap(_handle, other._handle);
}
InvertedIndexQueryCacheHandle& operator=(InvertedIndexQueryCacheHandle&& other) noexcept {
std::swap(_cache, other._cache);
std::swap(_handle, other._handle);
return *this;
}
LRUCachePolicy* cache() const { return _cache; }
std::shared_ptr<roaring::Roaring> get_bitmap() const {
if (!_cache) {
return nullptr;
}
return ((InvertedIndexQueryCache::CacheValue*)_cache->value(_handle))->bitmap;
}
private:
LRUCachePolicy* _cache = nullptr;
Cache::Handle* _handle = nullptr;
// Don't allow copy and assign
DISALLOW_COPY_AND_ASSIGN(InvertedIndexQueryCacheHandle);
};
} // namespace segment_v2
} // namespace doris