-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Expand file tree
/
Copy pathscanner.h
More file actions
276 lines (211 loc) · 8.82 KB
/
scanner.h
File metadata and controls
276 lines (211 loc) · 8.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <stdint.h>
#include <algorithm>
#include <atomic>
#include <vector>
#include "common/metrics/doris_metrics.h"
#include "common/status.h"
#include "core/block/block.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "storage/tablet/tablet.h"
#include "util/stopwatch.hpp"
namespace doris {
class RuntimeProfile;
class TupleDescriptor;
class VExprContext;
class ScanLocalStateBase;
} // namespace doris
namespace doris {
// Counter for load
struct ScannerCounter {
ScannerCounter() : num_rows_filtered(0), num_rows_unselected(0) {}
int64_t num_rows_filtered; // unqualified rows (unmatched the dest schema, or no partition)
int64_t num_rows_unselected; // rows filtered by predicates
};
class Scanner {
public:
Scanner(RuntimeState* state, ScanLocalStateBase* local_state, int64_t limit,
RuntimeProfile* profile);
//only used for FileScanner read one line.
Scanner(RuntimeState* state, RuntimeProfile* profile)
: _state(state), _limit(1), _profile(profile), _total_rf_num(0), _has_prepared(false) {
DorisMetrics::instance()->scanner_cnt->increment(1);
};
virtual ~Scanner() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_state->query_mem_tracker());
_input_block.clear();
_conjuncts.clear();
_projections.clear();
_origin_block.clear();
_common_expr_ctxs_push_down.clear();
DorisMetrics::instance()->scanner_cnt->increment(-1);
}
virtual Status init(RuntimeState* state, const VExprContextSPtrs& conjuncts);
virtual Status prepare() {
_has_prepared = true;
return Status::OK();
}
Status open(RuntimeState* state) {
SCOPED_RAW_TIMER(&_per_scanner_timer);
return _open_impl(state);
}
Status get_block(RuntimeState* state, Block* block, bool* eos);
Status get_block_after_projects(RuntimeState* state, Block* block, bool* eos);
virtual Status close(RuntimeState* state);
// Try to stop scanner, and all running readers.
virtual void try_stop() { _should_stop = true; };
virtual std::string get_name() { return ""; }
// return the readable name of current scan range.
// eg, for file scanner, return the current file path.
virtual std::string get_current_scan_range_name() { return "not implemented"; }
protected:
virtual Status _open_impl(RuntimeState* state) {
_block_avg_bytes = state->batch_size() * 8;
return Status::OK();
}
// Subclass should implement this to return data.
virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0;
Status _merge_padding_block() {
if (_padding_block.empty()) {
_padding_block.swap(_origin_block);
} else if (_origin_block.rows()) {
RETURN_IF_ERROR(
MutableBlock::build_mutable_block(&_padding_block).merge(_origin_block));
}
return Status::OK();
}
// Update the counters before closing this scanner
virtual void _collect_profile_before_close();
// Check if scanner is already closed, if not, mark it as closed.
// Returns true if the scanner was successfully marked as closed (first time).
// Returns false if the scanner was already closed.
bool _try_close();
// Filter the output block finally.
Status _filter_output_block(Block* block);
Status _do_projections(Block* origin_block, Block* output_block);
private:
// Call start_wait_worker_timer() when submit the scanner to the thread pool.
// And call update_wait_worker_timer() when it is actually being executed.
void _start_wait_worker_timer() {
_watch.reset();
_watch.start();
}
void _start_scan_cpu_timer() {
_cpu_watch.reset();
_cpu_watch.start();
}
void _update_wait_worker_timer() { _scanner_wait_worker_timer += _watch.elapsed_time(); }
void _update_scan_cpu_timer();
public:
void resume() {
_update_wait_worker_timer();
_start_scan_cpu_timer();
}
void pause() {
_update_scan_cpu_timer();
_start_wait_worker_timer();
}
// Called when submitting the scanner to the thread pool queue.
// Only starts the wait timer without touching the CPU timer, because the CPU
// timer uses CLOCK_THREAD_CPUTIME_ID which must be read on the same thread
// that started it.
void start_queue_wait() { _start_wait_worker_timer(); }
int64_t get_time_cost_ns() const { return _per_scanner_timer; }
int64_t projection_time() const { return _projection_timer; }
int64_t get_rows_read() const { return _num_rows_read; }
bool has_prepared() const { return _has_prepared; }
Status try_append_late_arrival_runtime_filter();
int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; }
// Some counters need to be updated realtime, for example, workload group policy need
// scan bytes to cancel the query exceed limit.
virtual void update_realtime_counters() {}
RuntimeState* runtime_state() { return _state; }
bool is_open() const { return _is_open; }
void set_opened() { _is_open = true; }
virtual doris::TabletStorageType get_storage_type() {
return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
}
bool need_to_close() const { return _need_to_close; }
void mark_to_need_to_close() {
// If the scanner is failed during init or open, then not need update counters
// because the query is fail and the counter is useless. And it may core during
// update counters. For example, update counters depend on scanner's tablet, but
// the tablet == null when init failed.
if (_is_open) {
_collect_profile_before_close();
}
_need_to_close = true;
}
void set_status_on_failure(const Status& st) { _status = st; }
int64_t limit() const { return _limit; }
auto get_block_avg_bytes() const { return _block_avg_bytes; }
void update_block_avg_bytes(size_t block_avg_bytes) { _block_avg_bytes = block_avg_bytes; }
protected:
RuntimeState* _state = nullptr;
ScanLocalStateBase* _local_state = nullptr;
// Set if scan node has sort limit info
int64_t _limit = -1;
RuntimeProfile* _profile = nullptr;
const TupleDescriptor* _output_tuple_desc = nullptr;
const RowDescriptor* _output_row_descriptor = nullptr;
// If _input_tuple_desc is set, the scanner will read data into
// this _input_block first, then convert to the output block.
Block _input_block;
bool _is_open = false;
std::atomic<bool> _is_closed {false};
bool _need_to_close = false;
Status _status;
// If _applied_rf_num == _total_rf_num
// means all runtime filters are arrived and applied.
int _applied_rf_num = 0;
int _total_rf_num = 0;
// Cloned from _conjuncts of scan node.
// It includes predicate in SQL and runtime filters.
VExprContextSPtrs _conjuncts;
VExprContextSPtrs _projections;
// Used in common subexpression elimination to compute intermediate results.
std::vector<VExprContextSPtrs> _intermediate_projections;
Block _origin_block;
Block _padding_block;
bool _alreay_eos = false;
VExprContextSPtrs _common_expr_ctxs_push_down;
// num of rows read from scanner
int64_t _num_rows_read = 0;
int64_t _num_byte_read = 0;
// num of rows return from scanner, after filter block
int64_t _num_rows_return = 0;
size_t _block_avg_bytes = 0;
// Set true after counter is updated finally
bool _has_updated_counter = false;
// watch to count the time wait for scanner thread
MonotonicStopWatch _watch;
// Do not use ScopedTimer. There is no guarantee that, the counter
ThreadCpuStopWatch _cpu_watch;
int64_t _scanner_wait_worker_timer = 0;
int64_t _scan_cpu_timer = 0;
bool _is_load = false;
bool _has_prepared = false;
ScannerCounter _counter;
int64_t _per_scanner_timer = 0;
int64_t _projection_timer = 0;
bool _should_stop = false;
};
using ScannerSPtr = std::shared_ptr<Scanner>;
} // namespace doris