-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Expand file tree
/
Copy pathpoint_query_executor.h
More file actions
350 lines (290 loc) · 11.7 KB
/
point_query_executor.h
File metadata and controls
350 lines (290 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <assert.h>
#include <butil/macros.h>
#include <butil/time.h>
#include <gen_cpp/Metrics_types.h>
#include <parallel_hashmap/phmap.h>
#include <stdint.h>
#include <string.h>
#include <algorithm>
#include <memory>
#include <mutex>
#include <optional>
#include <ostream>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "butil/containers/doubly_buffered_data.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "core/block/block.h"
#include "core/data_type_serde/data_type_serde.h"
#include "cpp/lru_cache.h"
#include "exprs/vexpr_fwd.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_profile.h"
#include "storage/olap_common.h"
#include "storage/rowset/rowset.h"
#include "storage/tablet/tablet.h"
#include "storage/utils.h"
#include "util/mysql_global.h"
#include "util/slice.h"
namespace doris {
class PTabletKeyLookupRequest;
class PTabletKeyLookupResponse;
class RuntimeState;
class TDescriptorTable;
class TExpr;
// For caching point lookup pre allocted blocks and exprs
class Reusable {
public:
~Reusable();
bool is_expired(int64_t ttl_ms) const {
return butil::gettimeofday_ms() - _create_timestamp > ttl_ms;
}
Status init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs,
const TQueryOptions& query_options, const TabletSchema& schema,
size_t block_size = 1);
std::unique_ptr<Block> get_block();
const DataTypeSerDeSPtrs& get_data_type_serdes() const { return _data_type_serdes; }
const std::unordered_map<uint32_t, uint32_t>& get_col_uid_to_idx() const {
return _col_uid_to_idx;
}
const std::vector<std::string>& get_col_default_values() const { return _col_default_values; }
// do not touch block after returned
void return_block(std::unique_ptr<Block>& block);
TupleDescriptor* tuple_desc() { return _desc_tbl->get_tuple_descriptor(0); }
const VExprContextSPtrs& output_exprs() { return _output_exprs_ctxs; }
int32_t rs_column_uid() const { return _row_store_column_ids; }
const std::unordered_set<int32_t> missing_col_uids() const { return _missing_col_uids; }
const std::unordered_set<int32_t> include_col_uids() const { return _include_col_uids; }
RuntimeState* runtime_state() { return _runtime_state.get(); }
// delete sign idx in block
int32_t delete_sign_idx() const { return _delete_sign_idx; }
private:
// caching TupleDescriptor, output_expr, etc...
std::unique_ptr<RuntimeState> _runtime_state;
DescriptorTbl* _desc_tbl = nullptr;
std::mutex _block_mutex;
// prevent from allocte too many tmp blocks
std::vector<std::unique_ptr<Block>> _block_pool;
VExprContextSPtrs _output_exprs_ctxs;
int64_t _create_timestamp = 0;
DataTypeSerDeSPtrs _data_type_serdes;
std::unordered_map<uint32_t, uint32_t> _col_uid_to_idx;
std::vector<std::string> _col_default_values;
// picked rowstore(column group) column unique id
int32_t _row_store_column_ids = -1;
// some column is missing in rowstore(column group), we need to fill them with column store values
std::unordered_set<int32_t> _missing_col_uids;
// included cids in rowstore(column group)
std::unordered_set<int32_t> _include_col_uids;
// delete sign idx in block
int32_t _delete_sign_idx = -1;
};
// RowCache is a LRU cache for row store
class RowCache : public LRUCachePolicy {
public:
using LRUCachePolicy::insert;
// The cache key for row lru cache
struct RowCacheKey {
RowCacheKey(int64_t tablet_id, const Slice& key) : tablet_id(tablet_id), key(key) {}
int64_t tablet_id;
Slice key;
// Encode to a flat binary which can be used as LRUCache's key
std::string encode() const {
std::string full_key;
full_key.reserve(sizeof(int64_t) + key.size);
const char* tid = reinterpret_cast<const char*>(&tablet_id);
full_key.append(tid, tid + sizeof(int64_t));
full_key.append(key.data, key.size);
return full_key;
}
};
class RowCacheValue : public LRUCacheValueBase {
public:
~RowCacheValue() override { free(cache_value); }
char* cache_value;
};
// A handle for RowCache entry. This class make it easy to handle
// Cache entry. Users don't need to release the obtained cache entry. This
// class will release the cache entry when it is destroyed.
class CacheHandle {
public:
CacheHandle() = default;
CacheHandle(LRUCachePolicy* cache, Cache::Handle* handle)
: _cache(cache), _handle(handle) {}
~CacheHandle() {
if (_handle != nullptr) {
_cache->release(_handle);
}
}
CacheHandle(CacheHandle&& other) noexcept {
std::swap(_cache, other._cache);
std::swap(_handle, other._handle);
}
CacheHandle& operator=(CacheHandle&& other) noexcept {
std::swap(_cache, other._cache);
std::swap(_handle, other._handle);
return *this;
}
bool valid() { return _cache != nullptr && _handle != nullptr; }
LRUCachePolicy* cache() const { return _cache; }
Slice data() const {
return {(char*)((RowCacheValue*)_cache->value(_handle))->cache_value,
reinterpret_cast<LRUHandle*>(_handle)->charge};
}
private:
LRUCachePolicy* _cache = nullptr;
Cache::Handle* _handle = nullptr;
// Don't allow copy and assign
DISALLOW_COPY_AND_ASSIGN(CacheHandle);
};
// Create global instance of this class
static RowCache* create_global_cache(int64_t capacity, uint32_t num_shards = kDefaultNumShards);
static RowCache* instance();
// Lookup a row key from cache,
// If the Row key is found, the cache entry will be written into handle.
// CacheHandle will release cache entry to cache when it destructs
// Return true if entry is found, otherwise return false.
bool lookup(const RowCacheKey& key, CacheHandle* handle);
// Insert a row with key into this cache.
// This function is thread-safe, and when two clients insert two same key
// concurrently, this function can assure that only one page is cached.
// The in_memory page will have higher priority.
void insert(const RowCacheKey& key, const Slice& data);
//
void erase(const RowCacheKey& key);
private:
static constexpr uint32_t kDefaultNumShards = 128;
RowCache(int64_t capacity, int num_shards = kDefaultNumShards);
};
// A cache used for prepare stmt.
// One connection per stmt perf uuid
class LookupConnectionCache : public LRUCachePolicy {
public:
static LookupConnectionCache* instance() {
return ExecEnv::GetInstance()->get_lookup_connection_cache();
}
static LookupConnectionCache* create_global_instance(size_t capacity);
private:
friend class PointQueryExecutor;
LookupConnectionCache(size_t capacity)
: LRUCachePolicy(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, capacity,
LRUCacheType::NUMBER, config::tablet_lookup_cache_stale_sweep_time_sec,
/*num shards*/ 32, /*element count capacity */ 0,
/*enable prune*/ true, /*is lru-k*/ true) {}
static std::string encode_key(__int128_t cache_id) {
fmt::memory_buffer buffer;
fmt::format_to(buffer, "{}", cache_id);
return std::string(buffer.data(), buffer.size());
}
void add(__int128_t cache_id, std::shared_ptr<Reusable> item) {
std::string key = encode_key(cache_id);
auto* value = new CacheValue;
value->item = item;
VLOG_DEBUG << "Add item mem"
<< ", cache_capacity: " << get_capacity() << ", cache_usage: " << get_usage()
<< ", mem_consum: " << mem_consumption();
auto* lru_handle = insert(key, value, 1, sizeof(Reusable), CachePriority::NORMAL);
release(lru_handle);
}
std::shared_ptr<Reusable> get(__int128_t cache_id) {
std::string key = encode_key(cache_id);
auto* lru_handle = lookup(key);
if (lru_handle) {
Defer release([cache = this, lru_handle] { cache->release(lru_handle); });
auto* value = (CacheValue*)(LRUCachePolicy::value(lru_handle));
return value->item;
}
return nullptr;
}
class CacheValue : public LRUCacheValueBase {
public:
~CacheValue() override;
std::shared_ptr<Reusable> item;
};
};
struct Metrics {
Metrics()
: init_ns(TUnit::TIME_NS),
init_key_ns(TUnit::TIME_NS),
lookup_key_ns(TUnit::TIME_NS),
lookup_data_ns(TUnit::TIME_NS),
output_data_ns(TUnit::TIME_NS),
load_segment_key_stage_ns(TUnit::TIME_NS),
load_segment_data_stage_ns(TUnit::TIME_NS) {}
RuntimeProfile::Counter init_ns;
RuntimeProfile::Counter init_key_ns;
RuntimeProfile::Counter lookup_key_ns;
RuntimeProfile::Counter lookup_data_ns;
RuntimeProfile::Counter output_data_ns;
RuntimeProfile::Counter load_segment_key_stage_ns;
RuntimeProfile::Counter load_segment_data_stage_ns;
OlapReaderStatistics read_stats;
size_t row_cache_hits = 0;
bool hit_lookup_cache = false;
size_t result_data_bytes;
};
// An util to do tablet lookup
class PointQueryExecutor {
public:
~PointQueryExecutor();
Status init(const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response);
Status lookup_up();
void print_profile();
const OlapReaderStatistics& read_stats() const { return _read_stats; }
private:
Status _init_keys(const PTabletKeyLookupRequest* request);
Status _lookup_row_key();
Status _lookup_row_data();
Status _output_data();
static void release_rowset(RowsetSharedPtr* r) {
if (r && *r) {
VLOG_DEBUG << "release rowset " << (*r)->rowset_id();
(*r)->release();
}
delete r;
}
// Read context for each row
struct RowReadContext {
RowReadContext() : _rowset_ptr(nullptr, &release_rowset) {}
std::string _primary_key;
RowCache::CacheHandle _cached_row_data;
std::optional<RowLocation> _row_location;
// rowset will be aquired during read
// and released after used
std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)> _rowset_ptr;
};
PTabletKeyLookupResponse* _response = nullptr;
BaseTabletSPtr _tablet;
std::vector<RowReadContext> _row_read_ctxs;
std::shared_ptr<Reusable> _reusable;
std::unique_ptr<Block> _result_block;
Metrics _profile_metrics;
bool _binary_row_format = false;
OlapReaderStatistics _read_stats;
int32_t _row_hits = 0;
// snapshot read version
int64_t _version = -1;
};
} // namespace doris