Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 90 additions & 1 deletion be/src/exec/operator/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <fmt/format.h>

#include <algorithm>
#include <memory>
#include <numeric>
#include <optional>
Expand Down Expand Up @@ -54,6 +55,50 @@

namespace doris {

namespace {

constexpr int64_t MAX_PROFILE_KEY_RANGES = 32;

int64_t key_range_count(const std::vector<std::unique_ptr<OlapScanRange>>& ranges) {
int64_t count = 0;
for (const auto& range : ranges) {
if (range->has_lower_bound) {
++count;
}
}
return count;
}

std::string scan_keys_profile_string(const std::vector<std::unique_ptr<OlapScanRange>>& ranges) {
fmt::memory_buffer scan_keys_buffer;
int64_t range_count = 0;
int64_t printed = 0;
for (const auto& range : ranges) {
if (!range->has_lower_bound) {
continue;
}
if (printed < MAX_PROFILE_KEY_RANGES) {
if (printed > 0) {
fmt::format_to(scan_keys_buffer, "; ");
}
fmt::format_to(scan_keys_buffer, "{}{} : {}{}", range->begin_include ? "[" : "(",
range->begin_scan_range.debug_string(),
range->end_scan_range.debug_string(), range->end_include ? "]" : ")");
++printed;
}
++range_count;
}
if (range_count > printed) {
if (printed > 0) {
fmt::format_to(scan_keys_buffer, "; ");
}
fmt::format_to(scan_keys_buffer, "... {} more", range_count - printed);
}
return fmt::to_string(scan_keys_buffer);
}

} // namespace

Status OlapScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
const TOlapScanNode& olap_scan_node = _parent->cast<OlapScanOperatorX>()._olap_scan_node;

Expand Down Expand Up @@ -582,6 +627,27 @@ bool OlapScanLocalState::_read_mor_as_dup() {
return p._olap_scan_node.__isset.read_mor_as_dup && p._olap_scan_node.read_mor_as_dup;
}

void OlapScanLocalState::_register_key_range_scan_filter() {
if (_scan_filter_profile == nullptr || _key_range_scan_filter) {
return;
}

std::vector<int32_t> source_filter_ids;
for (const auto& [_, filter_ids] : _slot_id_to_scan_filter_ids_for_key_range) {
source_filter_ids.insert(source_filter_ids.end(), filter_ids.begin(), filter_ids.end());
}
std::ranges::sort(source_filter_ids);
source_filter_ids.erase(std::ranges::unique(source_filter_ids).begin(),
source_filter_ids.end());

ScanFilterDesc desc;
desc.kind = ScanFilterKind::KEY_RANGE;
desc.compact_info = scan_keys_profile_string(_cond_ranges);
desc.source_filter_ids = std::move(source_filter_ids);
desc.range_count = key_range_count(_cond_ranges);
_key_range_scan_filter = _scan_filter_profile->register_filter(std::move(desc));
}

Status OlapScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {
if (_scan_ranges.empty()) {
_eos = true;
Expand All @@ -605,6 +671,10 @@ Status OlapScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {
if (_cond_ranges.empty()) {
_cond_ranges.emplace_back(new doris::OlapScanRange());
}
if (std::ranges::any_of(_cond_ranges,
[](const auto& range) { return range->has_lower_bound; })) {
_register_key_range_scan_filter();
}

// Filter out tablets whose partitions have been pruned by runtime filters.
//
Expand Down Expand Up @@ -1191,6 +1261,16 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
for (const auto& it : _slot_id_to_predicates[*key_to_erase]) {
if (!can_erase_predicate(*it)) {
new_predicates.push_back(it);
} else if (_scan_filter_profile != nullptr) {
const auto& handle = it->scan_filter_handle();
if (handle) {
auto& source_ids =
_slot_id_to_scan_filter_ids_for_key_range[*key_to_erase];
if (std::find(source_ids.begin(), source_ids.end(), handle.filter_id) ==
source_ids.end()) {
source_ids.push_back(handle.filter_id);
}
}
}
Comment thread
BiteTheDDDDt marked this conversation as resolved.
}
if (new_predicates.empty()) {
Expand All @@ -1211,7 +1291,9 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
}

if (state()->enable_profile()) {
custom_profile()->add_info_string("KeyRanges", _scan_keys.debug_string());
if (_scan_filter_profile == nullptr) {
custom_profile()->add_info_string("KeyRanges", _scan_keys.debug_string());
}
custom_profile()->add_info_string("TabletIds", tablets_id_to_string(_scan_ranges));
}
VLOG_CRITICAL << _scan_keys.debug_string();
Expand Down Expand Up @@ -1280,4 +1362,11 @@ void OlapScanLocalState::_attach_partition_boundaries() {
COUNTER_SET(_total_partitions_rf_counter, parsed->total_partitions());
}

ScanRuntimeFilterPartitionPruningStats OlapScanLocalState::_runtime_filter_partition_pruning_stats()
const {
auto stats = ScanLocalStateBase::_runtime_filter_partition_pruning_stats();
stats.pruned_tablets = _tablets_pruned_by_rf_counter->value();
return stats;
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/exec/operator/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
Status _init_scanners(std::list<ScannerSPtr>* scanners) override;

Status _build_key_ranges_and_filters();
void _register_key_range_scan_filter();

std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
std::vector<SyncRowsetStats> _sync_statistics;
Expand All @@ -138,6 +139,7 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
std::atomic_bool _sync_tablet = false;
std::vector<std::unique_ptr<doris::OlapScanRange>> _cond_ranges;
OlapScanKeys _scan_keys;
ScanFilterHandle _key_range_scan_filter;
// If column id in this set, indicate that we need to read data after index filtering
std::set<int32_t> _output_column_ids;

Expand Down Expand Up @@ -337,6 +339,7 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
// OlapScanOperatorX (parsed once in OperatorX::prepare()). Cheap: pointer
// assignment plus a counter set, no parsing work.
void _attach_partition_boundaries();
ScanRuntimeFilterPartitionPruningStats _runtime_filter_partition_pruning_stats() const override;

RuntimeProfile::Counter* _tablets_pruned_by_rf_counter = nullptr;
};
Expand Down
107 changes: 85 additions & 22 deletions be/src/exec/operator/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Metrics_types.h>

#include <algorithm>
#include <cstdint>
#include <memory>

Expand Down Expand Up @@ -77,6 +78,14 @@ Status ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* stat
size_t conjuncts_before = _conjuncts.size();
RETURN_IF_ERROR(_helper.try_append_late_arrival_runtime_filter(state, _parent->row_descriptor(),
arrived_rf_num, _conjuncts));
if (_scan_filter_profile != nullptr) {
for (size_t i = conjuncts_before; i < _conjuncts.size(); ++i) {
if (_conjuncts[i]->root() != nullptr && !_conjuncts[i]->scan_filter_handle()) {
_conjuncts[i]->attach_scan_filter(
_register_scan_filter(_conjuncts[i]->root(), nullptr));
}
Comment thread
BiteTheDDDDt marked this conversation as resolved.
}
}
if (state->enable_adjust_conjunct_order_by_cost()) {
std::ranges::sort(_conjuncts, [](const auto& a, const auto& b) {
return a->execute_cost() < b->execute_cost();
Expand Down Expand Up @@ -134,6 +143,12 @@ Status ScanLocalStateBase::_do_partition_pruning_by_rf() {
return Status::OK();
}

ScanRuntimeFilterPartitionPruningStats ScanLocalStateBase::_runtime_filter_partition_pruning_stats()
const {
return {.total_partitions = _total_partitions_rf_counter->value(),
.pruned_partitions = _partitions_pruned_by_rf_counter->value()};
}

int ScanLocalStateBase::max_scanners_concurrency(RuntimeState* state) const {
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
Expand Down Expand Up @@ -180,6 +195,9 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info)
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<typename Derived::Parent>();
_max_pushdown_conditions_per_column = p._max_pushdown_conditions_per_column;
if (state->enable_profile() && state->profile_level() >= 1) {
_scan_filter_profile = std::make_shared<ScanFilterProfile>();
}
RETURN_IF_ERROR(_helper.init(state, p.is_serial_operator(), p.node_id(), p.operator_id(),
_filter_dependencies, p.get_name() + "_FILTER_DEPENDENCY"));
RETURN_IF_ERROR(_init_profile());
Expand Down Expand Up @@ -207,6 +225,31 @@ static std::string predicates_to_string(
return fmt::to_string(debug_string_buffer);
}

ScanFilterHandle ScanLocalStateBase::_register_scan_filter(const VExprSPtr& root,
const SlotDescriptor* slot) {
if (_scan_filter_profile == nullptr || root == nullptr) {
return {};
}

ScanFilterDesc desc;
desc.kind = ScanFilterKind::NORMAL;
if (root->is_rf_wrapper()) {
desc.kind = ScanFilterKind::RUNTIME_FILTER;
desc.runtime_filter_id = assert_cast<RuntimeFilterExpr*>(root.get())->filter_id();
} else if (root->is_topn_filter()) {
desc.kind = ScanFilterKind::TOPN_FILTER;
desc.topn_filter_source_node_id = assert_cast<VTopNPred*>(root.get())->source_node_id();
}
if (slot != nullptr) {
desc.slot_id = slot->id();
desc.column_name = slot->col_name();
desc.column_id = _parent->intermediate_row_desc().get_column_id(slot->id());
}
desc.debug_string = root->debug_string();
desc.compact_info = desc.debug_string;
return _scan_filter_profile->register_filter(std::move(desc));
}

template <typename Derived>
Status ScanLocalState<Derived>::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
Expand Down Expand Up @@ -251,7 +294,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {

RETURN_IF_ERROR(_process_conjuncts(state));

if (state->enable_profile()) {
if (state->enable_profile() && _scan_filter_profile == nullptr) {
custom_profile()->add_info_string("PushDownPredicates",
predicates_to_string(_slot_id_to_predicates));
}
Expand Down Expand Up @@ -354,6 +397,9 @@ Status ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
RETURN_IF_ERROR(_normalize_predicate(conjunct.get(), conjunct->root(), new_root));
if (new_root) {
conjunct->set_root(new_root);
if (!conjunct->scan_filter_handle()) {
conjunct->attach_scan_filter(_register_scan_filter(conjunct->root(), nullptr));
}
if (_should_push_down_common_expr(conjunct->root())) {
_common_expr_ctxs_push_down.emplace_back(conjunct);
it = _conjuncts.erase(it);
Expand All @@ -369,7 +415,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
++it;
}

if (state->enable_profile()) {
if (state->enable_profile() && _scan_filter_profile == nullptr) {
std::string message;
for (auto& conjunct : _conjuncts) {
if (conjunct->root()) {
Expand Down Expand Up @@ -424,14 +470,16 @@ Status ScanLocalState<Derived>::_normalize_predicate(VExprContext* context, cons
}
if (_is_predicate_acting_on_slot(expr_root->children(), &slot, &range)) {
Status status = Status::OK();
auto& slot_predicates = _slot_id_to_predicates[slot->id()];
const size_t predicates_before = slot_predicates.size();
std::visit(
[&](auto& value_range) {
auto expr = root->is_rf_wrapper() ? root->get_impl() : root;
{
Defer attach_defer = [&]() {
if (pdt != PushDownType::UNACCEPTABLE && root->is_rf_wrapper()) {
auto* rf_expr = assert_cast<RuntimeFilterExpr*>(root.get());
_slot_id_to_predicates[slot->id()].back()->attach_profile_counter(
slot_predicates.back()->attach_profile_counter(
rf_expr->filter_id(),
rf_expr->predicate_filtered_rows_counter(),
rf_expr->predicate_input_rows_counter(),
Expand All @@ -442,43 +490,36 @@ Status ScanLocalState<Derived>::_normalize_predicate(VExprContext* context, cons
switch (expr->node_type()) {
case TExprNodeType::IN_PRED:
RETURN_IF_PUSH_DOWN(
_normalize_in_predicate(context, expr, slot,
_slot_id_to_predicates[slot->id()],
_normalize_in_predicate(context, expr, slot, slot_predicates,
value_range, &pdt),
status);
break;
case TExprNodeType::BINARY_PRED:
RETURN_IF_PUSH_DOWN(
_normalize_binary_predicate(context, expr, slot,
_slot_id_to_predicates[slot->id()],
value_range, &pdt),
slot_predicates, value_range, &pdt),
status);
break;
case TExprNodeType::FUNCTION_CALL:
if (expr->is_topn_filter()) {
RETURN_IF_PUSH_DOWN(
_normalize_topn_filter(context, expr, slot,
_slot_id_to_predicates[slot->id()],
&pdt),
status);
RETURN_IF_PUSH_DOWN(_normalize_topn_filter(context, expr, slot,
slot_predicates, &pdt),
status);
} else {
RETURN_IF_PUSH_DOWN(_normalize_is_null_predicate(
context, expr, slot,
_slot_id_to_predicates[slot->id()],
context, expr, slot, slot_predicates,
value_range, &pdt),
status);
}
break;
case TExprNodeType::BITMAP_PRED:
RETURN_IF_PUSH_DOWN(_normalize_bitmap_filter(
context, root, slot,
_slot_id_to_predicates[slot->id()], &pdt),
RETURN_IF_PUSH_DOWN(_normalize_bitmap_filter(context, root, slot,
slot_predicates, &pdt),
status);
break;
case TExprNodeType::BLOOM_PRED:
RETURN_IF_PUSH_DOWN(_normalize_bloom_filter(
context, root, slot,
_slot_id_to_predicates[slot->id()], &pdt),
RETURN_IF_PUSH_DOWN(_normalize_bloom_filter(context, root, slot,
slot_predicates, &pdt),
status);
break;
default:
Expand All @@ -493,6 +534,18 @@ Status ScanLocalState<Derived>::_normalize_predicate(VExprContext* context, cons
},
*range);
RETURN_IF_ERROR(status);
if (pdt != PushDownType::UNACCEPTABLE) {
auto handle = context->scan_filter_handle();
if (!handle) {
handle = _register_scan_filter(root, slot);
context->attach_scan_filter(handle);
}
if (handle) {
for (size_t i = predicates_before; i < slot_predicates.size(); ++i) {
slot_predicates[i]->attach_scan_filter(handle);
}
}
}
}
if (pdt == PushDownType::ACCEPTABLE && slotref != nullptr &&
slotref->data_type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
Expand Down Expand Up @@ -614,7 +667,12 @@ Status ScanLocalStateBase::_normalize_function_filters(VExprContext* expr_ctx, S
expr_ctx, &val, &fn_ctx, temp_pdt));
if (temp_pdt != PushDownType::UNACCEPTABLE) {
std::string col = slot->col_name();
_push_down_functions.emplace_back(opposite, col, fn_ctx, val);
auto handle = expr_ctx->scan_filter_handle();
if (!handle) {
handle = _register_scan_filter(expr_ctx->root(), slot);
expr_ctx->attach_scan_filter(handle);
}
_push_down_functions.emplace_back(opposite, col, fn_ctx, val, handle);
*pdt = temp_pdt;
}
}
Expand Down Expand Up @@ -1343,7 +1401,12 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
std::list<std::shared_ptr<ScannerDelegate>> {}.swap(_scanners);
COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, rf_time);
_helper.collect_realtime_profile(custom_profile());
_helper.collect_realtime_profile(custom_profile(), _scan_filter_profile.get());
if (_scan_filter_profile != nullptr) {
_scan_filter_profile->set_runtime_filter_partition_pruning_stats(
_runtime_filter_partition_pruning_stats());
_scan_filter_profile->materialize(custom_profile(), state->profile_level());
}
return PipelineXLocalState<>::close(state);
}

Expand Down
Loading
Loading