From 84a6b78dab73099aeaabb351276fe74290876bb4 Mon Sep 17 00:00:00 2001 From: Copilot <223556219+Copilot@users.noreply.github.com> Date: Sat, 6 Jun 2026 11:59:24 +0800 Subject: [PATCH] [feature](be) Add OLAP scan filter profile counters ### What problem does this PR solve? Issue Number: N/A Related PR: N/A Problem Summary: Existing OLAP scan profiles only expose aggregate filter counters, making it hard to understand the selectivity of each conjunct, runtime filter, key range, and index-related filtering step. This change adds scan-local filter ids and materializes detailed ScanFilterInfo and KeyRangeInfo profile sections. Per-filter counters now show input rows, filtered rows, stage participation, source expression or runtime filter debug text, and runtime filter wait or always-true data where applicable. Runtime filter dynamic partition pruning is exposed as a separate ScanFilterInfo child because it prunes partitions/tablets before segment-level row filtering. Empty mechanism-specific fields are omitted when they do not participate, topn filter sources include their source node id, and runtime filter ids remain distinct from scan filter ids. ### Release note Add detailed OLAP scan filter profile counters in query profile. ### Check List (For Author) - Test: Manual test - ./build.sh --be - git diff --cached --check - Ran TopN filter profile case against local cluster on 127.0.0.1:9333 - Behavior changed: Yes. Query profile includes detailed OLAP ScanFilterInfo and KeyRangeInfo counters. - Does this need documentation: No --- be/src/exec/operator/olap_scan_operator.cpp | 91 ++- be/src/exec/operator/olap_scan_operator.h | 3 + be/src/exec/operator/scan_operator.cpp | 107 ++- be/src/exec/operator/scan_operator.h | 9 + .../runtime_filter_consumer.cpp | 13 + .../runtime_filter/runtime_filter_consumer.h | 3 + .../runtime_filter_consumer_helper.cpp | 20 +- .../runtime_filter_consumer_helper.h | 7 +- be/src/exec/scan/olap_scanner.cpp | 8 +- be/src/exec/scan/scanner.cpp | 3 +- be/src/exprs/function_filter.h | 9 +- be/src/exprs/vexpr_context.cpp | 48 +- be/src/exprs/vexpr_context.h | 39 +- be/src/runtime/scan_filter_profile.cpp | 638 ++++++++++++++++++ be/src/runtime/scan_filter_profile.h | 153 +++++ be/src/storage/iterators.h | 4 + .../predicate/block_column_predicate.cpp | 46 ++ .../predicate/block_column_predicate.h | 38 ++ be/src/storage/predicate/column_predicate.h | 7 + be/src/storage/rowset/beta_rowset_reader.cpp | 2 + be/src/storage/rowset/rowset_reader_context.h | 3 + be/src/storage/segment/column_reader.cpp | 75 +- be/src/storage/segment/column_reader.h | 3 +- be/src/storage/segment/segment_iterator.cpp | 71 +- be/src/storage/segment/segment_iterator.h | 6 +- be/src/storage/tablet/tablet_reader.cpp | 8 +- be/src/storage/tablet/tablet_reader.h | 3 + 27 files changed, 1331 insertions(+), 86 deletions(-) create mode 100644 be/src/runtime/scan_filter_profile.cpp create mode 100644 be/src/runtime/scan_filter_profile.h diff --git a/be/src/exec/operator/olap_scan_operator.cpp b/be/src/exec/operator/olap_scan_operator.cpp index 6e7946a082b249..ad56b2e3f26e5c 100644 --- a/be/src/exec/operator/olap_scan_operator.cpp +++ b/be/src/exec/operator/olap_scan_operator.cpp @@ -19,6 +19,7 @@ #include +#include #include #include #include @@ -54,6 +55,50 @@ namespace doris { +namespace { + +constexpr int64_t MAX_PROFILE_KEY_RANGES = 32; + +int64_t key_range_count(const std::vector>& ranges) { + int64_t count = 0; + for (const auto& range : ranges) { + if (range->has_lower_bound) { + ++count; + } + } + return count; +} + +std::string scan_keys_profile_string(const std::vector>& ranges) { + fmt::memory_buffer scan_keys_buffer; + int64_t range_count = 0; + int64_t printed = 0; + for (const auto& range : ranges) { + if (!range->has_lower_bound) { + continue; + } + if (printed < MAX_PROFILE_KEY_RANGES) { + if (printed > 0) { + fmt::format_to(scan_keys_buffer, "; "); + } + fmt::format_to(scan_keys_buffer, "{}{} : {}{}", range->begin_include ? "[" : "(", + range->begin_scan_range.debug_string(), + range->end_scan_range.debug_string(), range->end_include ? "]" : ")"); + ++printed; + } + ++range_count; + } + if (range_count > printed) { + if (printed > 0) { + fmt::format_to(scan_keys_buffer, "; "); + } + fmt::format_to(scan_keys_buffer, "... {} more", range_count - printed); + } + return fmt::to_string(scan_keys_buffer); +} + +} // namespace + Status OlapScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { const TOlapScanNode& olap_scan_node = _parent->cast()._olap_scan_node; @@ -582,6 +627,27 @@ bool OlapScanLocalState::_read_mor_as_dup() { return p._olap_scan_node.__isset.read_mor_as_dup && p._olap_scan_node.read_mor_as_dup; } +void OlapScanLocalState::_register_key_range_scan_filter() { + if (_scan_filter_profile == nullptr || _key_range_scan_filter) { + return; + } + + std::vector source_filter_ids; + for (const auto& [_, filter_ids] : _slot_id_to_scan_filter_ids_for_key_range) { + source_filter_ids.insert(source_filter_ids.end(), filter_ids.begin(), filter_ids.end()); + } + std::ranges::sort(source_filter_ids); + source_filter_ids.erase(std::ranges::unique(source_filter_ids).begin(), + source_filter_ids.end()); + + ScanFilterDesc desc; + desc.kind = ScanFilterKind::KEY_RANGE; + desc.compact_info = scan_keys_profile_string(_cond_ranges); + desc.source_filter_ids = std::move(source_filter_ids); + desc.range_count = key_range_count(_cond_ranges); + _key_range_scan_filter = _scan_filter_profile->register_filter(std::move(desc)); +} + Status OlapScanLocalState::_init_scanners(std::list* scanners) { if (_scan_ranges.empty()) { _eos = true; @@ -605,6 +671,10 @@ Status OlapScanLocalState::_init_scanners(std::list* scanners) { if (_cond_ranges.empty()) { _cond_ranges.emplace_back(new doris::OlapScanRange()); } + if (std::ranges::any_of(_cond_ranges, + [](const auto& range) { return range->has_lower_bound; })) { + _register_key_range_scan_filter(); + } // Filter out tablets whose partitions have been pruned by runtime filters. // @@ -1191,6 +1261,16 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() { for (const auto& it : _slot_id_to_predicates[*key_to_erase]) { if (!can_erase_predicate(*it)) { new_predicates.push_back(it); + } else if (_scan_filter_profile != nullptr) { + const auto& handle = it->scan_filter_handle(); + if (handle) { + auto& source_ids = + _slot_id_to_scan_filter_ids_for_key_range[*key_to_erase]; + if (std::find(source_ids.begin(), source_ids.end(), handle.filter_id) == + source_ids.end()) { + source_ids.push_back(handle.filter_id); + } + } } } if (new_predicates.empty()) { @@ -1211,7 +1291,9 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() { } if (state()->enable_profile()) { - custom_profile()->add_info_string("KeyRanges", _scan_keys.debug_string()); + if (_scan_filter_profile == nullptr) { + custom_profile()->add_info_string("KeyRanges", _scan_keys.debug_string()); + } custom_profile()->add_info_string("TabletIds", tablets_id_to_string(_scan_ranges)); } VLOG_CRITICAL << _scan_keys.debug_string(); @@ -1280,4 +1362,11 @@ void OlapScanLocalState::_attach_partition_boundaries() { COUNTER_SET(_total_partitions_rf_counter, parsed->total_partitions()); } +ScanRuntimeFilterPartitionPruningStats OlapScanLocalState::_runtime_filter_partition_pruning_stats() + const { + auto stats = ScanLocalStateBase::_runtime_filter_partition_pruning_stats(); + stats.pruned_tablets = _tablets_pruned_by_rf_counter->value(); + return stats; +} + } // namespace doris diff --git a/be/src/exec/operator/olap_scan_operator.h b/be/src/exec/operator/olap_scan_operator.h index 51c60d9c45184f..a646b04f1c4bc4 100644 --- a/be/src/exec/operator/olap_scan_operator.h +++ b/be/src/exec/operator/olap_scan_operator.h @@ -127,6 +127,7 @@ class OlapScanLocalState final : public ScanLocalState { Status _init_scanners(std::list* scanners) override; Status _build_key_ranges_and_filters(); + void _register_key_range_scan_filter(); std::vector> _scan_ranges; std::vector _sync_statistics; @@ -138,6 +139,7 @@ class OlapScanLocalState final : public ScanLocalState { std::atomic_bool _sync_tablet = false; std::vector> _cond_ranges; OlapScanKeys _scan_keys; + ScanFilterHandle _key_range_scan_filter; // If column id in this set, indicate that we need to read data after index filtering std::set _output_column_ids; @@ -337,6 +339,7 @@ class OlapScanLocalState final : public ScanLocalState { // OlapScanOperatorX (parsed once in OperatorX::prepare()). Cheap: pointer // assignment plus a counter set, no parsing work. void _attach_partition_boundaries(); + ScanRuntimeFilterPartitionPruningStats _runtime_filter_partition_pruning_stats() const override; RuntimeProfile::Counter* _tablets_pruned_by_rf_counter = nullptr; }; diff --git a/be/src/exec/operator/scan_operator.cpp b/be/src/exec/operator/scan_operator.cpp index 8498512b16b965..d592f7c12f55e4 100644 --- a/be/src/exec/operator/scan_operator.cpp +++ b/be/src/exec/operator/scan_operator.cpp @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -77,6 +78,14 @@ Status ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* stat size_t conjuncts_before = _conjuncts.size(); RETURN_IF_ERROR(_helper.try_append_late_arrival_runtime_filter(state, _parent->row_descriptor(), arrived_rf_num, _conjuncts)); + if (_scan_filter_profile != nullptr) { + for (size_t i = conjuncts_before; i < _conjuncts.size(); ++i) { + if (_conjuncts[i]->root() != nullptr && !_conjuncts[i]->scan_filter_handle()) { + _conjuncts[i]->attach_scan_filter( + _register_scan_filter(_conjuncts[i]->root(), nullptr)); + } + } + } if (state->enable_adjust_conjunct_order_by_cost()) { std::ranges::sort(_conjuncts, [](const auto& a, const auto& b) { return a->execute_cost() < b->execute_cost(); @@ -134,6 +143,12 @@ Status ScanLocalStateBase::_do_partition_pruning_by_rf() { return Status::OK(); } +ScanRuntimeFilterPartitionPruningStats ScanLocalStateBase::_runtime_filter_partition_pruning_stats() + const { + return {.total_partitions = _total_partitions_rf_counter->value(), + .pruned_partitions = _partitions_pruned_by_rf_counter->value()}; +} + int ScanLocalStateBase::max_scanners_concurrency(RuntimeState* state) const { // For select * from table limit 10; should just use one thread. if (should_run_serial()) { @@ -180,6 +195,9 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) SCOPED_TIMER(_init_timer); auto& p = _parent->cast(); _max_pushdown_conditions_per_column = p._max_pushdown_conditions_per_column; + if (state->enable_profile() && state->profile_level() >= 1) { + _scan_filter_profile = std::make_shared(); + } RETURN_IF_ERROR(_helper.init(state, p.is_serial_operator(), p.node_id(), p.operator_id(), _filter_dependencies, p.get_name() + "_FILTER_DEPENDENCY")); RETURN_IF_ERROR(_init_profile()); @@ -207,6 +225,31 @@ static std::string predicates_to_string( return fmt::to_string(debug_string_buffer); } +ScanFilterHandle ScanLocalStateBase::_register_scan_filter(const VExprSPtr& root, + const SlotDescriptor* slot) { + if (_scan_filter_profile == nullptr || root == nullptr) { + return {}; + } + + ScanFilterDesc desc; + desc.kind = ScanFilterKind::NORMAL; + if (root->is_rf_wrapper()) { + desc.kind = ScanFilterKind::RUNTIME_FILTER; + desc.runtime_filter_id = assert_cast(root.get())->filter_id(); + } else if (root->is_topn_filter()) { + desc.kind = ScanFilterKind::TOPN_FILTER; + desc.topn_filter_source_node_id = assert_cast(root.get())->source_node_id(); + } + if (slot != nullptr) { + desc.slot_id = slot->id(); + desc.column_name = slot->col_name(); + desc.column_id = _parent->intermediate_row_desc().get_column_id(slot->id()); + } + desc.debug_string = root->debug_string(); + desc.compact_info = desc.debug_string; + return _scan_filter_profile->register_filter(std::move(desc)); +} + template Status ScanLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); @@ -251,7 +294,7 @@ Status ScanLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(_process_conjuncts(state)); - if (state->enable_profile()) { + if (state->enable_profile() && _scan_filter_profile == nullptr) { custom_profile()->add_info_string("PushDownPredicates", predicates_to_string(_slot_id_to_predicates)); } @@ -354,6 +397,9 @@ Status ScanLocalState::_normalize_conjuncts(RuntimeState* state) { RETURN_IF_ERROR(_normalize_predicate(conjunct.get(), conjunct->root(), new_root)); if (new_root) { conjunct->set_root(new_root); + if (!conjunct->scan_filter_handle()) { + conjunct->attach_scan_filter(_register_scan_filter(conjunct->root(), nullptr)); + } if (_should_push_down_common_expr(conjunct->root())) { _common_expr_ctxs_push_down.emplace_back(conjunct); it = _conjuncts.erase(it); @@ -369,7 +415,7 @@ Status ScanLocalState::_normalize_conjuncts(RuntimeState* state) { ++it; } - if (state->enable_profile()) { + if (state->enable_profile() && _scan_filter_profile == nullptr) { std::string message; for (auto& conjunct : _conjuncts) { if (conjunct->root()) { @@ -424,6 +470,8 @@ Status ScanLocalState::_normalize_predicate(VExprContext* context, cons } if (_is_predicate_acting_on_slot(expr_root->children(), &slot, &range)) { Status status = Status::OK(); + auto& slot_predicates = _slot_id_to_predicates[slot->id()]; + const size_t predicates_before = slot_predicates.size(); std::visit( [&](auto& value_range) { auto expr = root->is_rf_wrapper() ? root->get_impl() : root; @@ -431,7 +479,7 @@ Status ScanLocalState::_normalize_predicate(VExprContext* context, cons Defer attach_defer = [&]() { if (pdt != PushDownType::UNACCEPTABLE && root->is_rf_wrapper()) { auto* rf_expr = assert_cast(root.get()); - _slot_id_to_predicates[slot->id()].back()->attach_profile_counter( + slot_predicates.back()->attach_profile_counter( rf_expr->filter_id(), rf_expr->predicate_filtered_rows_counter(), rf_expr->predicate_input_rows_counter(), @@ -442,43 +490,36 @@ Status ScanLocalState::_normalize_predicate(VExprContext* context, cons switch (expr->node_type()) { case TExprNodeType::IN_PRED: RETURN_IF_PUSH_DOWN( - _normalize_in_predicate(context, expr, slot, - _slot_id_to_predicates[slot->id()], + _normalize_in_predicate(context, expr, slot, slot_predicates, value_range, &pdt), status); break; case TExprNodeType::BINARY_PRED: RETURN_IF_PUSH_DOWN( _normalize_binary_predicate(context, expr, slot, - _slot_id_to_predicates[slot->id()], - value_range, &pdt), + slot_predicates, value_range, &pdt), status); break; case TExprNodeType::FUNCTION_CALL: if (expr->is_topn_filter()) { - RETURN_IF_PUSH_DOWN( - _normalize_topn_filter(context, expr, slot, - _slot_id_to_predicates[slot->id()], - &pdt), - status); + RETURN_IF_PUSH_DOWN(_normalize_topn_filter(context, expr, slot, + slot_predicates, &pdt), + status); } else { RETURN_IF_PUSH_DOWN(_normalize_is_null_predicate( - context, expr, slot, - _slot_id_to_predicates[slot->id()], + context, expr, slot, slot_predicates, value_range, &pdt), status); } break; case TExprNodeType::BITMAP_PRED: - RETURN_IF_PUSH_DOWN(_normalize_bitmap_filter( - context, root, slot, - _slot_id_to_predicates[slot->id()], &pdt), + RETURN_IF_PUSH_DOWN(_normalize_bitmap_filter(context, root, slot, + slot_predicates, &pdt), status); break; case TExprNodeType::BLOOM_PRED: - RETURN_IF_PUSH_DOWN(_normalize_bloom_filter( - context, root, slot, - _slot_id_to_predicates[slot->id()], &pdt), + RETURN_IF_PUSH_DOWN(_normalize_bloom_filter(context, root, slot, + slot_predicates, &pdt), status); break; default: @@ -493,6 +534,18 @@ Status ScanLocalState::_normalize_predicate(VExprContext* context, cons }, *range); RETURN_IF_ERROR(status); + if (pdt != PushDownType::UNACCEPTABLE) { + auto handle = context->scan_filter_handle(); + if (!handle) { + handle = _register_scan_filter(root, slot); + context->attach_scan_filter(handle); + } + if (handle) { + for (size_t i = predicates_before; i < slot_predicates.size(); ++i) { + slot_predicates[i]->attach_scan_filter(handle); + } + } + } } if (pdt == PushDownType::ACCEPTABLE && slotref != nullptr && slotref->data_type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) { @@ -614,7 +667,12 @@ Status ScanLocalStateBase::_normalize_function_filters(VExprContext* expr_ctx, S expr_ctx, &val, &fn_ctx, temp_pdt)); if (temp_pdt != PushDownType::UNACCEPTABLE) { std::string col = slot->col_name(); - _push_down_functions.emplace_back(opposite, col, fn_ctx, val); + auto handle = expr_ctx->scan_filter_handle(); + if (!handle) { + handle = _register_scan_filter(expr_ctx->root(), slot); + expr_ctx->attach_scan_filter(handle); + } + _push_down_functions.emplace_back(opposite, col, fn_ctx, val, handle); *pdt = temp_pdt; } } @@ -1343,7 +1401,12 @@ Status ScanLocalState::close(RuntimeState* state) { std::list> {}.swap(_scanners); COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time()); COUNTER_SET(_wait_for_rf_timer, rf_time); - _helper.collect_realtime_profile(custom_profile()); + _helper.collect_realtime_profile(custom_profile(), _scan_filter_profile.get()); + if (_scan_filter_profile != nullptr) { + _scan_filter_profile->set_runtime_filter_partition_pruning_stats( + _runtime_filter_partition_pruning_stats()); + _scan_filter_profile->materialize(custom_profile(), state->profile_level()); + } return PipelineXLocalState<>::close(state); } diff --git a/be/src/exec/operator/scan_operator.h b/be/src/exec/operator/scan_operator.h index bebcc7ec708f0b..6acbf17fe9afbf 100644 --- a/be/src/exec/operator/scan_operator.h +++ b/be/src/exec/operator/scan_operator.h @@ -35,6 +35,7 @@ #include "exprs/vectorized_fn_call.h" #include "exprs/vin_predicate.h" #include "runtime/descriptors.h" +#include "runtime/scan_filter_profile.h" #include "storage/predicate/filter_olap_param.h" namespace doris { @@ -98,6 +99,8 @@ class ScanLocalStateBase : public PipelineXLocalState<> { Status clone_conjunct_ctxs(VExprContextSPtrs& scanner_conjuncts); + std::shared_ptr scan_filter_profile() const { return _scan_filter_profile; } + protected: friend class ScannerContext; friend class Scanner; @@ -110,6 +113,7 @@ class ScanLocalStateBase : public PipelineXLocalState<> { virtual Status _on_runtime_filter_update(); Status _do_partition_pruning_by_rf(); + virtual ScanRuntimeFilterPartitionPruningStats _runtime_filter_partition_pruning_stats() const; std::atomic _opened {false}; @@ -131,6 +135,8 @@ class ScanLocalStateBase : public PipelineXLocalState<> { // rows read from the scanner (including those discarded by (pre)filters) RuntimeProfile::Counter* _rows_read_counter = nullptr; + std::shared_ptr _scan_filter_profile; + RuntimeProfile::Counter* _num_scanners = nullptr; RuntimeProfile::Counter* _wait_for_rf_timer = nullptr; @@ -189,6 +195,8 @@ class ScanLocalStateBase : public PipelineXLocalState<> { return Status::OK(); } + ScanFilterHandle _register_scan_filter(const VExprSPtr& root, const SlotDescriptor* slot); + // Non-templated normalize methods, moved here to avoid re-compilation per Derived type. Status _eval_const_conjuncts(VExprContext* expr_ctx, PushDownType* pdt); Status _normalize_bloom_filter(VExprContext* expr_ctx, const VExprSPtr& root, @@ -345,6 +353,7 @@ class ScanLocalState : public ScanLocalStateBase { // Parsed from conjuncts phmap::flat_hash_map _slot_id_to_value_range; phmap::flat_hash_map>> _slot_id_to_predicates; + phmap::flat_hash_map> _slot_id_to_scan_filter_ids_for_key_range; std::vector> _or_predicates; std::vector> _filter_dependencies; diff --git a/be/src/exec/runtime_filter/runtime_filter_consumer.cpp b/be/src/exec/runtime_filter/runtime_filter_consumer.cpp index d1a6cd7d357d37..b8ae87b3fe2921 100644 --- a/be/src/exec/runtime_filter/runtime_filter_consumer.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_consumer.cpp @@ -23,6 +23,7 @@ #include "exprs/vbloom_predicate.h" #include "exprs/vdirect_in_predicate.h" #include "runtime/runtime_profile.h" +#include "runtime/scan_filter_profile.h" namespace doris { Status RuntimeFilterConsumer::_apply_ready_expr(std::vector& push_exprs) { @@ -258,4 +259,16 @@ void RuntimeFilterConsumer::collect_realtime_profile(RuntimeProfile* parent_oper c->update(_always_true_counter->value()); } +void RuntimeFilterConsumer::collect_scan_filter_profile(ScanFilterProfile* scan_filter_profile) { + std::unique_lock l(_rmtx); + DCHECK(scan_filter_profile != nullptr); + scan_filter_profile->set_runtime_filter_profile_stats( + {.runtime_filter_id = _wrapper->filter_id(), + .input_rows = _rf_input->value(), + .filtered_rows = _rf_filter->value(), + .wait_time_ns = _wait_timer->value(), + .always_true_filter_rows = _always_true_counter->value(), + .debug_string = debug_string()}); +} + } // namespace doris diff --git a/be/src/exec/runtime_filter/runtime_filter_consumer.h b/be/src/exec/runtime_filter/runtime_filter_consumer.h index 11d8a294812c07..5a56e011a2d10f 100644 --- a/be/src/exec/runtime_filter/runtime_filter_consumer.h +++ b/be/src/exec/runtime_filter/runtime_filter_consumer.h @@ -28,6 +28,8 @@ #include "runtime/runtime_profile.h" namespace doris { +class ScanFilterProfile; + // Work on ScanNode or MultiCastDataStreamSource, RuntimeFilterConsumerHelper will manage all RuntimeFilterConsumer // Used to create RuntimeFilterExpr to filter data class RuntimeFilterConsumer : public RuntimeFilter { @@ -68,6 +70,7 @@ class RuntimeFilterConsumer : public RuntimeFilter { // Called by RuntimeFilterConsumerHelper void collect_realtime_profile(RuntimeProfile* parent_operator_profile); + void collect_scan_filter_profile(ScanFilterProfile* scan_filter_profile); static std::string to_string(const State& state) { switch (state) { diff --git a/be/src/exec/runtime_filter/runtime_filter_consumer_helper.cpp b/be/src/exec/runtime_filter/runtime_filter_consumer_helper.cpp index 202ba27c353639..7da0f1aa807354 100644 --- a/be/src/exec/runtime_filter/runtime_filter_consumer_helper.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_consumer_helper.cpp @@ -19,6 +19,7 @@ #include "exec/runtime_filter/runtime_filter_consumer.h" #include "runtime/runtime_profile.h" +#include "runtime/scan_filter_profile.h" namespace doris { RuntimeFilterConsumerHelper::RuntimeFilterConsumerHelper( @@ -129,8 +130,21 @@ Status RuntimeFilterConsumerHelper::try_append_late_arrival_runtime_filter( return Status::OK(); } -void RuntimeFilterConsumerHelper::collect_realtime_profile( - RuntimeProfile* parent_operator_profile) { +void RuntimeFilterConsumerHelper::collect_realtime_profile(RuntimeProfile* parent_operator_profile, + ScanFilterProfile* scan_filter_profile) { + if (_consumers.empty()) { + return; + } + + if (scan_filter_profile != nullptr) { + scan_filter_profile->set_runtime_filter_acquire_time( + _acquire_runtime_filter_timer->value()); + for (const auto& consumer : _consumers) { + consumer->collect_scan_filter_profile(scan_filter_profile); + } + return; + } + std::ignore = parent_operator_profile->add_counter("RuntimeFilterInfo", TUnit::NONE, RuntimeProfile::ROOT_COUNTER, 1); RuntimeProfile::Counter* c = parent_operator_profile->add_counter( @@ -142,4 +156,4 @@ void RuntimeFilterConsumerHelper::collect_realtime_profile( } } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/runtime_filter/runtime_filter_consumer_helper.h b/be/src/exec/runtime_filter/runtime_filter_consumer_helper.h index f08636cc0749c0..5cf96752e6f305 100644 --- a/be/src/exec/runtime_filter/runtime_filter_consumer_helper.h +++ b/be/src/exec/runtime_filter/runtime_filter_consumer_helper.h @@ -24,6 +24,8 @@ #include "runtime/runtime_profile.h" namespace doris { +class ScanFilterProfile; + // this class used in ScanNode or MultiCastDataStreamSource /** * init -> acquire_runtime_filter -> try_append_late_arrival_runtime_filter @@ -49,7 +51,8 @@ class RuntimeFilterConsumerHelper { // Called by XXXLocalState::close() // parent_operator_profile is owned by LocalState so update it is safe at here. - void collect_realtime_profile(RuntimeProfile* parent_operator_profile); + void collect_realtime_profile(RuntimeProfile* parent_operator_profile, + ScanFilterProfile* scan_filter_profile = nullptr); size_t runtime_filter_nums() const { return _runtime_filter_descs.size(); } @@ -71,4 +74,4 @@ class RuntimeFilterConsumerHelper { std::unique_ptr _acquire_runtime_filter_timer = std::make_unique(TUnit::TIME_NS, 0); }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index 33f225864da7cf..c04b413e09670d 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -89,6 +89,8 @@ OlapScanner::OlapScanner(ScanLocalStateBase* parent, OlapScanner::Params&& param .rs_splits {}, .return_columns {}, .output_columns {}, + .scan_filter_profile {}, + .key_range_scan_filter {}, .common_expr_ctxs_push_down {}, .topn_filter_source_node_ids {}, .key_group_cluster_key_idxes {}, @@ -340,9 +342,9 @@ Status OlapScanner::_init_tablet_reader_params( _tablet_reader_params.vir_cid_to_idx_in_block = _vir_cid_to_idx_in_block; _tablet_reader_params.vir_col_idx_to_type = _vir_col_idx_to_type; _tablet_reader_params.score_runtime = _score_runtime; - _tablet_reader_params.output_columns = ((OlapScanLocalState*)_local_state)->_output_column_ids; + _tablet_reader_params.output_columns = olap_local_state->_output_column_ids; _tablet_reader_params.ann_topn_runtime = _ann_topn_runtime; - for (const auto& ele : ((OlapScanLocalState*)_local_state)->_cast_types_for_variants) { + for (const auto& ele : olap_local_state->_cast_types_for_variants) { _tablet_reader_params.target_cast_type_for_variants[ele.first] = ele.second; }; auto& tablet_schema = _tablet_reader_params.tablet_schema; @@ -387,6 +389,8 @@ Status OlapScanner::_init_tablet_reader_params( _tablet_reader_params.profile = _local_state->custom_profile(); _tablet_reader_params.runtime_state = _state; + _tablet_reader_params.scan_filter_profile = _local_state->scan_filter_profile(); + _tablet_reader_params.key_range_scan_filter = olap_local_state->_key_range_scan_filter; _tablet_reader_params.origin_return_columns = &_return_columns; _tablet_reader_params.tablet_columns_convert_to_null_set = &_tablet_columns_convert_to_null_set; diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp index 7d72d5838d1a83..5038a3f70c314c 100644 --- a/be/src/exec/scan/scanner.cpp +++ b/be/src/exec/scan/scanner.cpp @@ -197,7 +197,8 @@ Status Scanner::get_block(RuntimeState* state, Block* block, bool* eof) { Status Scanner::_filter_output_block(Block* block) { auto old_rows = block->rows(); - Status st = VExprContext::filter_block(_conjuncts, block, block->columns()); + Status st = VExprContext::filter_block(_conjuncts, block, block->columns(), + ScanFilterStage::EXEC_RESIDUAL); _counter.num_rows_unselected += old_rows - block->rows(); return st; } diff --git a/be/src/exprs/function_filter.h b/be/src/exprs/function_filter.h index 54bd39e223398d..13b79f8ec6fe56 100644 --- a/be/src/exprs/function_filter.h +++ b/be/src/exprs/function_filter.h @@ -17,20 +17,24 @@ #pragma once #include +#include #include "core/string_ref.h" // IWYU pragma: keep #include "exprs/function_context.h" +#include "runtime/scan_filter_profile.h" namespace doris { class FunctionFilter { public: FunctionFilter(bool opposite, const std::string& col_name, doris::FunctionContext* fn_ctx, - doris::StringRef string_param) + doris::StringRef string_param, + ScanFilterHandle scan_filter_handle = ScanFilterHandle()) : _opposite(opposite), _col_name(col_name), _fn_ctx(fn_ctx), - _string_param(string_param) {} + _string_param(string_param), + _scan_filter_handle(std::move(scan_filter_handle)) {} bool _opposite; std::string _col_name; @@ -38,6 +42,7 @@ class FunctionFilter { doris::FunctionContext* _fn_ctx = nullptr; // only one param from conjunct, because now only support like predicate doris::StringRef _string_param; + ScanFilterHandle _scan_filter_handle; }; } // namespace doris diff --git a/be/src/exprs/vexpr_context.cpp b/be/src/exprs/vexpr_context.cpp index 1bdbff89ae086f..d38cc8493652ed 100644 --- a/be/src/exprs/vexpr_context.cpp +++ b/be/src/exprs/vexpr_context.cpp @@ -162,6 +162,7 @@ Status VExprContext::clone(RuntimeState* state, VExprContextSPtr& new_ctx) { // segment_v2::AnnRangeSearchRuntime should be cloned as well. // The object of segment_v2::AnnRangeSearchRuntime is not shared by threads. new_ctx->_ann_range_search_runtime = this->_ann_range_search_runtime; + new_ctx->_scan_filter_handle = _scan_filter_handle; return _root->open(state, new_ctx.get(), FunctionContext::THREAD_LOCAL); } @@ -203,7 +204,8 @@ Status VExprContext::filter_block(VExprContext* vexpr_ctx, Block* block) { } Status VExprContext::filter_block(const VExprContextSPtrs& expr_contexts, Block* block, - size_t column_to_keep) { + size_t column_to_keep, + std::optional scan_filter_stage) { if (expr_contexts.empty() || block->rows() == 0) { return Status::OK(); } @@ -212,13 +214,15 @@ Status VExprContext::filter_block(const VExprContextSPtrs& expr_contexts, Block* std::iota(columns_to_filter.begin(), columns_to_filter.end(), 0); return execute_conjuncts_and_filter_block(expr_contexts, block, columns_to_filter, - static_cast(column_to_keep)); + static_cast(column_to_keep), scan_filter_stage); } Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, const std::vector* filters, Block* block, - IColumn::Filter* result_filter, bool* can_filter_all) { - return execute_conjuncts(ctxs, filters, false, block, result_filter, can_filter_all); + IColumn::Filter* result_filter, bool* can_filter_all, + std::optional scan_filter_stage) { + return execute_conjuncts(ctxs, filters, false, block, result_filter, can_filter_all, + scan_filter_stage); } Status VExprContext::execute_filter(const Block* block, uint8_t* __restrict result_filter_data, @@ -230,14 +234,27 @@ Status VExprContext::execute_filter(const Block* block, uint8_t* __restrict resu Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, const std::vector* filters, bool accept_null, const Block* block, - IColumn::Filter* result_filter, bool* can_filter_all) { + IColumn::Filter* result_filter, bool* can_filter_all, + std::optional scan_filter_stage) { size_t rows = block->rows(); DCHECK_EQ(result_filter->size(), rows); *can_filter_all = false; auto* __restrict result_filter_data = result_filter->data(); for (const auto& ctx : ctxs) { + const bool collect_scan_filter_stats = + scan_filter_stage.has_value() && ctx->scan_filter_handle(); + const int64_t input_rows = + collect_scan_filter_stats + ? std::count(result_filter_data, result_filter_data + rows, 1) + : 0; RETURN_IF_ERROR( ctx->execute_filter(block, result_filter_data, rows, accept_null, can_filter_all)); + if (collect_scan_filter_stats) { + const int64_t output_rows = + *can_filter_all ? 0 + : std::count(result_filter_data, result_filter_data + rows, 1); + ctx->scan_filter_handle().stats->record(*scan_filter_stage, input_rows, output_rows); + } if (*can_filter_all) { return Status::OK(); } @@ -311,16 +328,16 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& conjuncts, const // TODO Performance Optimization // need exception safety -Status VExprContext::execute_conjuncts_and_filter_block(const VExprContextSPtrs& ctxs, Block* block, - std::vector& columns_to_filter, - int column_to_keep) { +Status VExprContext::execute_conjuncts_and_filter_block( + const VExprContextSPtrs& ctxs, Block* block, std::vector& columns_to_filter, + int column_to_keep, std::optional scan_filter_stage) { IColumn::Filter result_filter(block->rows(), 1); bool can_filter_all; _reset_memory_usage(ctxs); - RETURN_IF_ERROR( - execute_conjuncts(ctxs, nullptr, false, block, &result_filter, &can_filter_all)); + RETURN_IF_ERROR(execute_conjuncts(ctxs, nullptr, false, block, &result_filter, &can_filter_all, + scan_filter_stage)); // Accumulate the usage of `result_filter` into the first context. if (!ctxs.empty()) { @@ -356,14 +373,15 @@ Status VExprContext::execute_conjuncts_and_filter_block(const VExprContextSPtrs& return Status::OK(); } -Status VExprContext::execute_conjuncts_and_filter_block(const VExprContextSPtrs& ctxs, Block* block, - std::vector& columns_to_filter, - int column_to_keep, - IColumn::Filter& filter) { +Status VExprContext::execute_conjuncts_and_filter_block( + const VExprContextSPtrs& ctxs, Block* block, std::vector& columns_to_filter, + int column_to_keep, IColumn::Filter& filter, + std::optional scan_filter_stage) { _reset_memory_usage(ctxs); filter.resize_fill(block->rows(), 1); bool can_filter_all; - RETURN_IF_ERROR(execute_conjuncts(ctxs, nullptr, false, block, &filter, &can_filter_all)); + RETURN_IF_ERROR(execute_conjuncts(ctxs, nullptr, false, block, &filter, &can_filter_all, + scan_filter_stage)); // Accumulate the usage of `result_filter` into the first context. if (!ctxs.empty()) { diff --git a/be/src/exprs/vexpr_context.h b/be/src/exprs/vexpr_context.h index 84900c024636cf..cbf8438cdc8757 100644 --- a/be/src/exprs/vexpr_context.h +++ b/be/src/exprs/vexpr_context.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -35,6 +36,7 @@ #include "exprs/function_context.h" #include "exprs/vexpr_fwd.h" #include "runtime/runtime_state.h" +#include "runtime/scan_filter_profile.h" #include "storage/index/ann/ann_range_search_runtime.h" #include "storage/index/ann/ann_search_params.h" #include "storage/index/inverted/inverted_index_reader.h" @@ -263,6 +265,10 @@ class VExprContext { std::shared_ptr get_index_context() const { return _index_context; } + void attach_scan_filter(ScanFilterHandle handle) { _scan_filter_handle = std::move(handle); } + + const ScanFilterHandle& scan_filter_handle() const { return _scan_filter_handle; } + /// Creates a FunctionContext, and returns the index that's passed to fn_context() to /// retrieve the created context. Exprs that need a FunctionContext should call this in /// Prepare() and save the returned index. 'varargs_buffer_size', if specified, is the @@ -293,30 +299,32 @@ class VExprContext { [[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* block); - [[nodiscard]] static Status filter_block(const VExprContextSPtrs& expr_contexts, Block* block, - size_t column_to_keep); + [[nodiscard]] static Status filter_block( + const VExprContextSPtrs& expr_contexts, Block* block, size_t column_to_keep, + std::optional scan_filter_stage = std::nullopt); - [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& ctxs, - const std::vector* filters, - bool accept_null, const Block* block, - IColumn::Filter* result_filter, - bool* can_filter_all); + [[nodiscard]] static Status execute_conjuncts( + const VExprContextSPtrs& ctxs, const std::vector* filters, + bool accept_null, const Block* block, IColumn::Filter* result_filter, + bool* can_filter_all, std::optional scan_filter_stage = std::nullopt); [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& conjuncts, const Block* block, ColumnUInt8& null_map, IColumn::Filter& result_filter); - static Status execute_conjuncts(const VExprContextSPtrs& ctxs, - const std::vector* filters, Block* block, - IColumn::Filter* result_filter, bool* can_filter_all); + static Status execute_conjuncts( + const VExprContextSPtrs& ctxs, const std::vector* filters, + Block* block, IColumn::Filter* result_filter, bool* can_filter_all, + std::optional scan_filter_stage = std::nullopt); [[nodiscard]] static Status execute_conjuncts_and_filter_block( const VExprContextSPtrs& ctxs, Block* block, std::vector& columns_to_filter, - int column_to_keep); + int column_to_keep, std::optional scan_filter_stage = std::nullopt); - static Status execute_conjuncts_and_filter_block(const VExprContextSPtrs& ctxs, Block* block, - std::vector& columns_to_filter, - int column_to_keep, IColumn::Filter& filter); + static Status execute_conjuncts_and_filter_block( + const VExprContextSPtrs& ctxs, Block* block, std::vector& columns_to_filter, + int column_to_keep, IColumn::Filter& filter, + std::optional scan_filter_stage = std::nullopt); [[nodiscard]] static Status get_output_block_after_execute_exprs(const VExprContextSPtrs&, const Block&, Block*, @@ -356,6 +364,7 @@ class VExprContext { _last_result_column_id = other._last_result_column_id; _depth_num = other._depth_num; + _scan_filter_handle = other._scan_filter_handle; return *this; } @@ -368,6 +377,7 @@ class VExprContext { _fn_contexts = std::move(other._fn_contexts); _last_result_column_id = other._last_result_column_id; _depth_num = other._depth_num; + _scan_filter_handle = other._scan_filter_handle; return *this; } @@ -421,6 +431,7 @@ class VExprContext { int _depth_num = 0; std::shared_ptr _index_context; + ScanFilterHandle _scan_filter_handle; size_t _memory_usage = 0; segment_v2::AnnRangeSearchRuntime _ann_range_search_runtime; diff --git a/be/src/runtime/scan_filter_profile.cpp b/be/src/runtime/scan_filter_profile.cpp new file mode 100644 index 00000000000000..d14103459aeb99 --- /dev/null +++ b/be/src/runtime/scan_filter_profile.cpp @@ -0,0 +1,638 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/scan_filter_profile.h" + +#include +#include + +#include +#include + +#include "runtime/runtime_profile.h" + +namespace doris { + +namespace { + +constexpr const char* SCAN_FILTER_INFO = "ScanFilterInfo"; +constexpr const char* KEY_RANGE_INFO = "KeyRangeInfo"; +constexpr const char* RUNTIME_FILTER_PARTITION_PRUNING = "RuntimeFilterPartitionPruning"; + +bool is_index_stage(ScanFilterStage stage) { + return stage == ScanFilterStage::INDEX_INVERTED || + stage == ScanFilterStage::INDEX_BLOOM_FILTER || + stage == ScanFilterStage::INDEX_ZONE_MAP || stage == ScanFilterStage::INDEX_DICT || + stage == ScanFilterStage::INDEX_ANN; +} + +bool is_exec_stage(ScanFilterStage stage) { + return stage == ScanFilterStage::EXEC_VECTOR || stage == ScanFilterStage::EXEC_SHORT_CIRCUIT || + stage == ScanFilterStage::EXEC_COMMON_EXPR || stage == ScanFilterStage::EXEC_RESIDUAL; +} + +std::string join_ids(const std::vector& ids) { + std::stringstream ss; + for (size_t i = 0; i < ids.size(); ++i) { + if (i != 0) { + ss << ","; + } + ss << ids[i]; + } + return ss.str(); +} + +void set_counter(RuntimeProfile* profile, const std::string& name, TUnit::type type, + const std::string& parent, int64_t level, int64_t value) { + auto* counter = profile->add_counter(name, type, parent, level); + counter->set(value); +} + +void set_root_counter(RuntimeProfile* profile, const std::string& name, TUnit::type type, + int64_t level, int64_t value) { + set_counter(profile, name, type, RuntimeProfile::ROOT_COUNTER, level, value); +} + +RuntimeProfile* get_or_create_child(RuntimeProfile* profile, const std::string& name) { + auto* child = profile->get_child(name); + if (child != nullptr) { + return child; + } + return profile->create_child(name, true, false); +} + +void add_info_string_if_not_empty(RuntimeProfile* profile, const std::string& key, + const std::string& value) { + if (!value.empty()) { + profile->add_info_string(key, value); + } +} + +int stage_display_order(ScanFilterStage stage) { + switch (stage) { + case ScanFilterStage::KEY_RANGE: + return 1; + case ScanFilterStage::INDEX_INVERTED: + return 2; + case ScanFilterStage::INDEX_ANN: + return 3; + case ScanFilterStage::INDEX_DICT: + return 4; + case ScanFilterStage::INDEX_BLOOM_FILTER: + return 5; + case ScanFilterStage::INDEX_ZONE_MAP: + return 6; + case ScanFilterStage::EXEC_VECTOR: + return 7; + case ScanFilterStage::EXEC_SHORT_CIRCUIT: + return 8; + case ScanFilterStage::EXEC_COMMON_EXPR: + return 9; + case ScanFilterStage::EXEC_RESIDUAL: + return 10; + case ScanFilterStage::NUM_STAGES: + break; + } + return 99; +} + +const char* stage_profile_name(ScanFilterStage stage) { + return scan_filter_stage_name(stage); +} + +const char* scan_filter_source_name(ScanFilterKind kind) { + switch (kind) { + case ScanFilterKind::NORMAL: + return "Conjunct"; + case ScanFilterKind::RUNTIME_FILTER: + return "RuntimeFilter"; + case ScanFilterKind::TOPN_FILTER: + return "TopNFilter"; + case ScanFilterKind::KEY_RANGE: + return "KeyRange"; + case ScanFilterKind::UNKNOWN: + return "Unknown"; + } + return "Unknown"; +} + +bool contains_id(const std::vector& ids, int32_t id) { + return std::find(ids.begin(), ids.end(), id) != ids.end(); +} + +const ScanRuntimeFilterProfileStats* find_runtime_filter_stats( + const std::vector& stats, int32_t runtime_filter_id) { + for (const auto& item : stats) { + if (item.runtime_filter_id == runtime_filter_id) { + return &item; + } + } + return nullptr; +} + +bool has_runtime_filter_partition_pruning_stats( + const ScanRuntimeFilterPartitionPruningStats& stats) { + return stats.total_partitions > 0 || stats.pruned_partitions > 0 || stats.pruned_tablets > 0; +} + +constexpr std::array(ScanFilterStage::NUM_STAGES)> +ordered_stages() { + return {ScanFilterStage::KEY_RANGE, ScanFilterStage::INDEX_INVERTED, + ScanFilterStage::INDEX_ANN, ScanFilterStage::INDEX_DICT, + ScanFilterStage::INDEX_BLOOM_FILTER, ScanFilterStage::INDEX_ZONE_MAP, + ScanFilterStage::EXEC_VECTOR, ScanFilterStage::EXEC_SHORT_CIRCUIT, + ScanFilterStage::EXEC_COMMON_EXPR, ScanFilterStage::EXEC_RESIDUAL}; +} + +struct SummaryStats { + bool participated = false; + bool has_filtering_stage = false; + bool has_time = false; + int first_stage = 99; + int last_stage = -1; + int64_t input_rows = 0; + int64_t output_rows = 0; + int64_t filtered_rows = 0; + int64_t time_ns = 0; +}; + +void update_summary(SummaryStats* summary, ScanFilterStage stage, + const ScanFilterStageStatsSnapshot& stats) { + if (!stats.participated) { + return; + } + const auto order = stage_display_order(stage); + if (stats.filtered_rows > 0) { + if (!summary->has_filtering_stage || order < summary->first_stage) { + summary->first_stage = order; + summary->input_rows = stats.input_rows; + } + if (!summary->has_filtering_stage || order > summary->last_stage) { + summary->last_stage = order; + summary->output_rows = stats.output_rows; + } + summary->has_filtering_stage = true; + } else if (!summary->has_filtering_stage && + (!summary->participated || order > summary->last_stage)) { + summary->first_stage = order; + summary->last_stage = order; + summary->input_rows = stats.input_rows; + summary->output_rows = stats.output_rows; + } + summary->participated = true; + summary->filtered_rows += stats.filtered_rows; + if (stats.has_time) { + summary->has_time = true; + summary->time_ns += stats.time_ns; + } +} + +struct MaterializedFilterSnapshot { + ScanFilterDesc desc; + std::array(ScanFilterStage::NUM_STAGES)> + stage_snapshots; + SummaryStats total; + SummaryStats index; + SummaryStats exec; +}; + +void materialize_filter_stage(RuntimeProfile* filter_profile, ScanFilterStage stage, + const ScanFilterStageStatsSnapshot& stats) { + auto* stage_profile = get_or_create_child(filter_profile, stage_profile_name(stage)); + set_root_counter(stage_profile, "InputRows", TUnit::UNIT, 2, stats.input_rows); + set_root_counter(stage_profile, "FilteredRows", TUnit::UNIT, 2, stats.filtered_rows); + if (stats.has_time) { + set_root_counter(stage_profile, "Time", TUnit::TIME_NS, 2, stats.time_ns); + } +} + +std::string scan_filter_stages_string(const MaterializedFilterSnapshot& snapshot, + bool is_key_range_source) { + std::vector stages; + for (const auto stage : ordered_stages()) { + if (snapshot.stage_snapshots[static_cast(stage)].participated) { + stages.emplace_back(stage_profile_name(stage)); + } + } + if (stages.empty()) { + return is_key_range_source ? "KeyRangeInfo" : "NotApplied"; + } + + std::stringstream ss; + for (size_t i = 0; i < stages.size(); ++i) { + if (i != 0) { + ss << " -> "; + } + ss << stages[i]; + } + return ss.str(); +} + +std::string target_string(const ScanFilterDesc& desc) { + std::stringstream ss; + if (desc.slot_id >= 0) { + ss << "slot=" << desc.slot_id; + } + if (desc.column_id >= 0) { + if (ss.tellp() > 0) { + ss << ", "; + } + ss << "column_id=" << desc.column_id; + } + if (!desc.column_name.empty()) { + if (ss.tellp() > 0) { + ss << ", "; + } + ss << "column=" << desc.column_name; + } + return ss.str(); +} + +std::string source_string(const ScanFilterDesc& desc) { + if (desc.kind == ScanFilterKind::RUNTIME_FILTER) { + return fmt::format("{} rf_id={}", scan_filter_source_name(desc.kind), + desc.runtime_filter_id); + } + if (desc.kind == ScanFilterKind::TOPN_FILTER) { + return fmt::format("{} source_node_id={}", scan_filter_source_name(desc.kind), + desc.topn_filter_source_node_id); + } + return scan_filter_source_name(desc.kind); +} + +void materialize_filter_counters(RuntimeProfile* filter_profile, + const MaterializedFilterSnapshot& snapshot, int profile_level, + bool is_key_range_source, + const ScanRuntimeFilterProfileStats* runtime_filter_stats) { + filter_profile->add_info_string("Source", source_string(snapshot.desc)); + add_info_string_if_not_empty(filter_profile, "Target", target_string(snapshot.desc)); + filter_profile->add_info_string("Stages", + scan_filter_stages_string(snapshot, is_key_range_source)); + add_info_string_if_not_empty(filter_profile, "Expr", snapshot.desc.compact_info); + if (profile_level >= 2 && runtime_filter_stats != nullptr && + !runtime_filter_stats->debug_string.empty()) { + filter_profile->add_info_string("RuntimeFilterInfo", runtime_filter_stats->debug_string); + } + if (profile_level >= 2 && !snapshot.desc.debug_string.empty() && + snapshot.desc.debug_string != snapshot.desc.compact_info) { + filter_profile->add_info_string("DebugString", snapshot.desc.debug_string); + } + if (profile_level >= 2 && !snapshot.desc.source_filter_ids.empty()) { + filter_profile->add_info_string("SourceFilterIds", + join_ids(snapshot.desc.source_filter_ids)); + } + + if (snapshot.total.participated) { + set_root_counter(filter_profile, "InputRows", TUnit::UNIT, 1, snapshot.total.input_rows); + set_root_counter(filter_profile, "FilteredRows", TUnit::UNIT, 1, + snapshot.total.filtered_rows); + if (snapshot.total.has_time) { + set_root_counter(filter_profile, "FilterTime", TUnit::TIME_NS, 1, + snapshot.total.time_ns); + } + } + if (runtime_filter_stats != nullptr) { + if (!snapshot.total.participated && runtime_filter_stats->input_rows > 0) { + DCHECK_GE(runtime_filter_stats->input_rows, runtime_filter_stats->filtered_rows); + set_root_counter(filter_profile, "InputRows", TUnit::UNIT, 1, + runtime_filter_stats->input_rows); + set_root_counter(filter_profile, "FilteredRows", TUnit::UNIT, 1, + runtime_filter_stats->filtered_rows); + } + set_root_counter(filter_profile, "WaitTime", TUnit::TIME_NS, 1, + runtime_filter_stats->wait_time_ns); + set_root_counter(filter_profile, "AlwaysTrueFilterRows", TUnit::UNIT, 1, + runtime_filter_stats->always_true_filter_rows); + } + + if (profile_level < 2) { + return; + } + for (const auto stage : ordered_stages()) { + const auto& stage_stats = snapshot.stage_snapshots[static_cast(stage)]; + if (!stage_stats.participated) { + continue; + } + materialize_filter_stage(filter_profile, stage, stage_stats); + } +} + +void materialize_runtime_filter_partition_pruning( + RuntimeProfile* scan_filter_profile, const ScanRuntimeFilterPartitionPruningStats& stats) { + auto* pruning_profile = + get_or_create_child(scan_filter_profile, RUNTIME_FILTER_PARTITION_PRUNING); + if (stats.total_partitions > 0) { + set_root_counter(pruning_profile, "TotalPartitions", TUnit::UNIT, 1, + stats.total_partitions); + } + if (stats.pruned_partitions > 0) { + set_root_counter(pruning_profile, "PrunedPartitions", TUnit::UNIT, 1, + stats.pruned_partitions); + } + if (stats.pruned_tablets > 0) { + set_root_counter(pruning_profile, "PrunedTablets", TUnit::UNIT, 1, stats.pruned_tablets); + } +} + +int first_profile_order(const MaterializedFilterSnapshot& snapshot, bool is_key_range_source) { + if (snapshot.total.participated) { + return snapshot.total.first_stage; + } + if (is_key_range_source) { + return 1; + } + return 99; +} + +int64_t first_profile_input_rows(const MaterializedFilterSnapshot& snapshot, int order) { + if (snapshot.total.participated && snapshot.total.first_stage == order) { + return snapshot.total.input_rows; + } + return snapshot.total.input_rows; +} + +} // namespace + +const char* scan_filter_kind_name(ScanFilterKind kind) { + switch (kind) { + case ScanFilterKind::NORMAL: + return "NORMAL"; + case ScanFilterKind::RUNTIME_FILTER: + return "RUNTIME_FILTER"; + case ScanFilterKind::TOPN_FILTER: + return "TOPN_FILTER"; + case ScanFilterKind::KEY_RANGE: + return "KEY_RANGE"; + case ScanFilterKind::UNKNOWN: + return "UNKNOWN"; + } + return "UNKNOWN"; +} + +const char* scan_filter_stage_name(ScanFilterStage stage) { + switch (stage) { + case ScanFilterStage::KEY_RANGE: + return "KeyRange"; + case ScanFilterStage::INDEX_INVERTED: + return "IndexInverted"; + case ScanFilterStage::INDEX_BLOOM_FILTER: + return "IndexBloomFilter"; + case ScanFilterStage::INDEX_ZONE_MAP: + return "IndexZoneMap"; + case ScanFilterStage::INDEX_DICT: + return "IndexDict"; + case ScanFilterStage::INDEX_ANN: + return "IndexAnn"; + case ScanFilterStage::EXEC_VECTOR: + return "ExecuteVector"; + case ScanFilterStage::EXEC_SHORT_CIRCUIT: + return "ExecuteShortCircuit"; + case ScanFilterStage::EXEC_COMMON_EXPR: + return "ExecuteCommonExpr"; + case ScanFilterStage::EXEC_RESIDUAL: + return "ExecuteResidual"; + case ScanFilterStage::NUM_STAGES: + break; + } + return "Unknown"; +} + +void ScanFilterStats::record(ScanFilterStage stage, int64_t input_rows, int64_t output_rows, + std::optional time_ns) { + DCHECK_GE(input_rows, output_rows); + const auto stage_index = static_cast(stage); + DCHECK_LT(stage_index, _stage_stats.size()); + auto& stats = _stage_stats[stage_index]; + stats.participated.store(true, std::memory_order_relaxed); + stats.calls.fetch_add(1, std::memory_order_relaxed); + stats.input_rows.fetch_add(input_rows, std::memory_order_relaxed); + stats.output_rows.fetch_add(output_rows, std::memory_order_relaxed); + stats.filtered_rows.fetch_add(input_rows - output_rows, std::memory_order_relaxed); + if (time_ns.has_value()) { + stats.has_time.store(true, std::memory_order_relaxed); + stats.time_ns.fetch_add(*time_ns, std::memory_order_relaxed); + } +} + +ScanFilterStageStatsSnapshot ScanFilterStats::snapshot(ScanFilterStage stage) const { + const auto stage_index = static_cast(stage); + DCHECK_LT(stage_index, _stage_stats.size()); + const auto& stats = _stage_stats[stage_index]; + return {.participated = stats.participated.load(std::memory_order_relaxed), + .has_time = stats.has_time.load(std::memory_order_relaxed), + .calls = stats.calls.load(std::memory_order_relaxed), + .input_rows = stats.input_rows.load(std::memory_order_relaxed), + .output_rows = stats.output_rows.load(std::memory_order_relaxed), + .filtered_rows = stats.filtered_rows.load(std::memory_order_relaxed), + .time_ns = stats.time_ns.load(std::memory_order_relaxed)}; +} + +ScanFilterHandle ScanFilterProfile::register_filter(ScanFilterDesc desc) { + auto stats = std::make_shared(); + std::lock_guard lock(_lock); + const auto filter_id = static_cast(_descs.size()); + desc.filter_id = filter_id; + _descs.emplace_back(std::move(desc)); + _stats.emplace_back(stats); + return {.filter_id = filter_id, .stats = std::move(stats)}; +} + +void ScanFilterProfile::add_source_filter(int32_t filter_id, int32_t source_filter_id) { + std::lock_guard lock(_lock); + DCHECK_GE(filter_id, 0); + DCHECK_LT(filter_id, _descs.size()); + auto& source_ids = _descs[filter_id].source_filter_ids; + if (std::find(source_ids.begin(), source_ids.end(), source_filter_id) == source_ids.end()) { + source_ids.push_back(source_filter_id); + } +} + +std::vector ScanFilterProfile::_snapshots() const { + std::lock_guard lock(_lock); + std::vector snapshots; + snapshots.reserve(_descs.size()); + for (size_t i = 0; i < _descs.size(); ++i) { + snapshots.push_back({.desc = _descs[i], .stats = _stats[i]}); + } + return snapshots; +} + +std::vector ScanFilterProfile::_runtime_filter_stats_snapshots() + const { + std::lock_guard lock(_lock); + return _runtime_filter_stats; +} + +int64_t ScanFilterProfile::_runtime_filter_acquire_time_snapshot() const { + std::lock_guard lock(_lock); + return _runtime_filter_acquire_time_ns; +} + +ScanRuntimeFilterPartitionPruningStats +ScanFilterProfile::_runtime_filter_partition_pruning_stats_snapshot() const { + std::lock_guard lock(_lock); + return _runtime_filter_partition_pruning_stats; +} + +void ScanFilterProfile::set_runtime_filter_acquire_time(int64_t acquire_time_ns) { + std::lock_guard lock(_lock); + _runtime_filter_acquire_time_ns = acquire_time_ns; +} + +void ScanFilterProfile::set_runtime_filter_profile_stats(ScanRuntimeFilterProfileStats stats) { + std::lock_guard lock(_lock); + DCHECK_GE(stats.runtime_filter_id, 0); + const auto has_scan_filter_desc = std::ranges::any_of(_descs, [&](const auto& desc) { + return desc.kind == ScanFilterKind::RUNTIME_FILTER && + desc.runtime_filter_id == stats.runtime_filter_id; + }); + if (!has_scan_filter_desc) { + ScanFilterDesc desc; + desc.filter_id = static_cast(_descs.size()); + desc.kind = ScanFilterKind::RUNTIME_FILTER; + desc.runtime_filter_id = stats.runtime_filter_id; + _descs.emplace_back(std::move(desc)); + _stats.emplace_back(std::make_shared()); + } + for (auto& item : _runtime_filter_stats) { + if (item.runtime_filter_id == stats.runtime_filter_id) { + item = std::move(stats); + return; + } + } + _runtime_filter_stats.emplace_back(std::move(stats)); +} + +void ScanFilterProfile::set_runtime_filter_partition_pruning_stats( + ScanRuntimeFilterPartitionPruningStats stats) { + std::lock_guard lock(_lock); + _runtime_filter_partition_pruning_stats = stats; +} + +void ScanFilterProfile::materialize(RuntimeProfile* profile, int profile_level) const { + if (profile == nullptr || profile_level <= 0) { + return; + } + + const auto snapshots = _snapshots(); + const auto runtime_filter_stats = _runtime_filter_stats_snapshots(); + const auto runtime_filter_acquire_time_ns = _runtime_filter_acquire_time_snapshot(); + const auto runtime_filter_partition_pruning_stats = + _runtime_filter_partition_pruning_stats_snapshot(); + std::vector scan_filter_snapshots; + std::optional key_range_snapshot; + scan_filter_snapshots.reserve(snapshots.size()); + + for (const auto& snapshot : snapshots) { + MaterializedFilterSnapshot materialized; + materialized.desc = snapshot.desc; + for (int i = 0; i < static_cast(ScanFilterStage::NUM_STAGES); ++i) { + const auto stage = static_cast(i); + materialized.stage_snapshots[i] = snapshot.stats->snapshot(stage); + update_summary(&materialized.total, stage, materialized.stage_snapshots[i]); + if (is_index_stage(stage)) { + update_summary(&materialized.index, stage, materialized.stage_snapshots[i]); + } else if (is_exec_stage(stage)) { + update_summary(&materialized.exec, stage, materialized.stage_snapshots[i]); + } + } + + if (materialized.desc.kind == ScanFilterKind::KEY_RANGE) { + DCHECK(!key_range_snapshot.has_value()); + key_range_snapshot = std::move(materialized); + } else { + scan_filter_snapshots.emplace_back(std::move(materialized)); + } + } + + if (key_range_snapshot.has_value()) { + auto* key_range_profile = get_or_create_child(profile, KEY_RANGE_INFO); + const auto& snapshot = *key_range_snapshot; + if (!snapshot.desc.source_filter_ids.empty()) { + key_range_profile->add_info_string("SourceFilterIds", + join_ids(snapshot.desc.source_filter_ids)); + } + if (profile_level >= 2) { + add_info_string_if_not_empty(key_range_profile, "ScanKeys", snapshot.desc.compact_info); + } + + set_root_counter(key_range_profile, "RangeNum", TUnit::UNIT, 1, snapshot.desc.range_count); + set_root_counter(key_range_profile, "InputRows", TUnit::UNIT, 1, snapshot.total.input_rows); + set_root_counter(key_range_profile, "FilteredRows", TUnit::UNIT, 1, + snapshot.total.filtered_rows); + } + + const bool has_partition_pruning_stats = + has_runtime_filter_partition_pruning_stats(runtime_filter_partition_pruning_stats); + if (scan_filter_snapshots.empty() && !has_partition_pruning_stats && + runtime_filter_acquire_time_ns <= 0) { + return; + } + + std::vector key_range_source_filter_ids; + if (key_range_snapshot.has_value()) { + key_range_source_filter_ids = key_range_snapshot->desc.source_filter_ids; + } + + std::ranges::sort(scan_filter_snapshots, [&](const auto& left, const auto& right) { + const bool left_key_range_source = + contains_id(key_range_source_filter_ids, left.desc.filter_id); + const bool right_key_range_source = + contains_id(key_range_source_filter_ids, right.desc.filter_id); + const int left_order = first_profile_order(left, left_key_range_source); + const int right_order = first_profile_order(right, right_key_range_source); + if (left_order != right_order) { + return left_order < right_order; + } + const int64_t left_input_rows = first_profile_input_rows(left, left_order); + const int64_t right_input_rows = first_profile_input_rows(right, right_order); + if (left_input_rows != right_input_rows) { + return left_input_rows > right_input_rows; + } + return left.desc.filter_id < right.desc.filter_id; + }); + + auto* scan_filter_profile = get_or_create_child(profile, SCAN_FILTER_INFO); + if (runtime_filter_acquire_time_ns > 0) { + set_root_counter(scan_filter_profile, "RuntimeFilterAcquireTime", TUnit::TIME_NS, 2, + runtime_filter_acquire_time_ns); + } + if (has_partition_pruning_stats) { + materialize_runtime_filter_partition_pruning(scan_filter_profile, + runtime_filter_partition_pruning_stats); + } + + if (scan_filter_snapshots.empty()) { + return; + } + + for (const auto& snapshot : scan_filter_snapshots) { + auto* filter_profile = get_or_create_child( + scan_filter_profile, fmt::format("ScanFilter {}", snapshot.desc.filter_id)); + const auto* runtime_filter_stat = + snapshot.desc.kind == ScanFilterKind::RUNTIME_FILTER + ? find_runtime_filter_stats(runtime_filter_stats, + snapshot.desc.runtime_filter_id) + : nullptr; + materialize_filter_counters( + filter_profile, snapshot, profile_level, + contains_id(key_range_source_filter_ids, snapshot.desc.filter_id), + runtime_filter_stat); + } +} + +} // namespace doris diff --git a/be/src/runtime/scan_filter_profile.h b/be/src/runtime/scan_filter_profile.h new file mode 100644 index 00000000000000..d8d2628b4f13fc --- /dev/null +++ b/be/src/runtime/scan_filter_profile.h @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace doris { + +class RuntimeProfile; + +enum class ScanFilterKind { + NORMAL, + RUNTIME_FILTER, + TOPN_FILTER, + KEY_RANGE, + UNKNOWN, +}; + +enum class ScanFilterStage { + KEY_RANGE = 0, + INDEX_INVERTED, + INDEX_BLOOM_FILTER, + INDEX_ZONE_MAP, + INDEX_DICT, + INDEX_ANN, + EXEC_VECTOR, + EXEC_SHORT_CIRCUIT, + EXEC_COMMON_EXPR, + EXEC_RESIDUAL, + NUM_STAGES, +}; + +struct ScanFilterDesc { + int32_t filter_id = -1; + ScanFilterKind kind = ScanFilterKind::UNKNOWN; + int32_t runtime_filter_id = -1; + int32_t topn_filter_source_node_id = -1; + int32_t slot_id = -1; + int32_t column_id = -1; + std::string column_name; + std::string compact_info; + std::string debug_string; + std::vector source_filter_ids; + int64_t range_count = 0; +}; + +struct ScanRuntimeFilterProfileStats { + int32_t runtime_filter_id = -1; + int64_t input_rows = 0; + int64_t filtered_rows = 0; + int64_t wait_time_ns = 0; + int64_t always_true_filter_rows = 0; + std::string debug_string; +}; + +struct ScanRuntimeFilterPartitionPruningStats { + int64_t total_partitions = 0; + int64_t pruned_partitions = 0; + int64_t pruned_tablets = 0; +}; + +struct ScanFilterStageStatsSnapshot { + bool participated = false; + bool has_time = false; + int64_t calls = 0; + int64_t input_rows = 0; + int64_t output_rows = 0; + int64_t filtered_rows = 0; + int64_t time_ns = 0; +}; + +class ScanFilterStats { +public: + void record(ScanFilterStage stage, int64_t input_rows, int64_t output_rows, + std::optional time_ns = std::nullopt); + + ScanFilterStageStatsSnapshot snapshot(ScanFilterStage stage) const; + +private: + struct StageStats { + std::atomic calls = 0; + std::atomic input_rows = 0; + std::atomic output_rows = 0; + std::atomic filtered_rows = 0; + std::atomic time_ns = 0; + std::atomic has_time = false; + std::atomic participated = false; + }; + + static constexpr size_t STAGE_NUM = static_cast(ScanFilterStage::NUM_STAGES); + std::array _stage_stats; +}; + +struct ScanFilterHandle { + int32_t filter_id = -1; + std::shared_ptr stats; + + explicit operator bool() const { return filter_id >= 0 && stats != nullptr; } +}; + +class ScanFilterProfile { +public: + ScanFilterHandle register_filter(ScanFilterDesc desc); + void add_source_filter(int32_t filter_id, int32_t source_filter_id); + void set_runtime_filter_acquire_time(int64_t acquire_time_ns); + void set_runtime_filter_profile_stats(ScanRuntimeFilterProfileStats stats); + void set_runtime_filter_partition_pruning_stats(ScanRuntimeFilterPartitionPruningStats stats); + void materialize(RuntimeProfile* profile, int profile_level) const; + +private: + struct FilterSnapshot { + ScanFilterDesc desc; + std::shared_ptr stats; + }; + + std::vector _snapshots() const; + std::vector _runtime_filter_stats_snapshots() const; + int64_t _runtime_filter_acquire_time_snapshot() const; + ScanRuntimeFilterPartitionPruningStats _runtime_filter_partition_pruning_stats_snapshot() const; + + mutable std::mutex _lock; + std::vector _descs; + std::vector> _stats; + std::vector _runtime_filter_stats; + int64_t _runtime_filter_acquire_time_ns = 0; + ScanRuntimeFilterPartitionPruningStats _runtime_filter_partition_pruning_stats; +}; + +const char* scan_filter_kind_name(ScanFilterKind kind); +const char* scan_filter_stage_name(ScanFilterStage stage); + +} // namespace doris diff --git a/be/src/storage/iterators.h b/be/src/storage/iterators.h index 7c0b78959d79bc..c6f0f728b934f0 100644 --- a/be/src/storage/iterators.h +++ b/be/src/storage/iterators.h @@ -26,6 +26,7 @@ #include "exprs/vexpr.h" #include "io/io_common.h" #include "runtime/runtime_state.h" +#include "runtime/scan_filter_profile.h" #include "storage/index/ann/ann_topn_runtime.h" #include "storage/olap_common.h" #include "storage/predicate/block_column_predicate.h" @@ -38,6 +39,7 @@ namespace doris { class Schema; class ColumnPredicate; +class ScanFilterProfile; struct IteratorRowRef; @@ -109,6 +111,8 @@ class StorageReadOptions { // REQUIRED (null is not allowed) OlapReaderStatistics* stats = nullptr; + std::shared_ptr scan_filter_profile; + ScanFilterHandle key_range_scan_filter; bool use_page_cache = false; uint32_t block_row_max = 4096 - 32; // see https://github.com/apache/doris/pull/11816 // Effective adaptive batch size byte budget. diff --git a/be/src/storage/predicate/block_column_predicate.cpp b/be/src/storage/predicate/block_column_predicate.cpp index 60bf1fd37e79cd..8d0acbf4bd0d37 100644 --- a/be/src/storage/predicate/block_column_predicate.cpp +++ b/be/src/storage/predicate/block_column_predicate.cpp @@ -28,6 +28,24 @@ namespace segment_v2 { class InvertedIndexIterator; } // namespace segment_v2 +namespace { + +template +bool evaluate_and_with_scan_filter( + const std::vector>& predicates, ScanFilterStage stage, + int64_t input_rows, EvaluateFunc&& evaluate_func) { + for (auto& predicate : predicates) { + const bool matched = evaluate_func(*predicate); + predicate->record_scan_filter(stage, input_rows, matched ? input_rows : 0); + if (!matched) { + return false; + } + } + return true; +} + +} // namespace + uint16_t SingleColumnBlockPredicate::evaluate(MutableColumns& block, uint16_t* sel, uint16_t selected_size) const { auto column_id = _predicate->column_id(); @@ -181,6 +199,15 @@ bool AndBlockColumnPredicate::evaluate_and(const segment_v2::ZoneMap& zone_map) return true; } +bool AndBlockColumnPredicate::evaluate_and_with_scan_filter(const segment_v2::ZoneMap& zone_map, + ScanFilterStage stage, + int64_t input_rows) const { + return doris::evaluate_and_with_scan_filter(_block_column_predicate_vec, stage, input_rows, + [&zone_map](const BlockColumnPredicate& predicate) { + return predicate.evaluate_and(zone_map); + }); +} + bool AndBlockColumnPredicate::evaluate_and(const segment_v2::BloomFilter* bf) const { for (auto& block_column_predicate : _block_column_predicate_vec) { if (!block_column_predicate->evaluate_and(bf)) { @@ -190,6 +217,14 @@ bool AndBlockColumnPredicate::evaluate_and(const segment_v2::BloomFilter* bf) co return true; } +bool AndBlockColumnPredicate::evaluate_and_with_scan_filter(const segment_v2::BloomFilter* bf, + ScanFilterStage stage, + int64_t input_rows) const { + return doris::evaluate_and_with_scan_filter( + _block_column_predicate_vec, stage, input_rows, + [bf](const BlockColumnPredicate& predicate) { return predicate.evaluate_and(bf); }); +} + bool AndBlockColumnPredicate::evaluate_and(const StringRef* dict_words, const size_t dict_num) const { for (auto& predicate : _block_column_predicate_vec) { @@ -200,6 +235,17 @@ bool AndBlockColumnPredicate::evaluate_and(const StringRef* dict_words, return true; } +bool AndBlockColumnPredicate::evaluate_and_with_scan_filter(const StringRef* dict_words, + const size_t dict_num, + ScanFilterStage stage, + int64_t input_rows) const { + return doris::evaluate_and_with_scan_filter( + _block_column_predicate_vec, stage, input_rows, + [dict_words, dict_num](const BlockColumnPredicate& predicate) { + return predicate.evaluate_and(dict_words, dict_num); + }); +} + void AndBlockColumnPredicate::evaluate_or(MutableColumns& block, uint16_t* sel, uint16_t selected_size, bool* flags) const { if (num_of_column_predicate() == 1) { diff --git a/be/src/storage/predicate/block_column_predicate.h b/be/src/storage/predicate/block_column_predicate.h index ac25ab3151db28..2d723e0e9b5534 100644 --- a/be/src/storage/predicate/block_column_predicate.h +++ b/be/src/storage/predicate/block_column_predicate.h @@ -60,6 +60,15 @@ class BlockColumnPredicate { virtual void get_all_column_predicate( std::set>& predicate_set) const = 0; + virtual ScanFilterHandle scan_filter_handle() const { return {}; } + virtual bool has_scan_filter() const { return static_cast(scan_filter_handle()); } + virtual void record_scan_filter(ScanFilterStage stage, int64_t input_rows, + int64_t output_rows) const { + if (auto handle = scan_filter_handle()) { + handle.stats->record(stage, input_rows, output_rows); + } + } + virtual uint16_t evaluate(MutableColumns& block, uint16_t* sel, uint16_t selected_size) const { return selected_size; } @@ -126,6 +135,10 @@ class SingleColumnBlockPredicate : public BlockColumnPredicate { predicate_set.insert(_predicate); } + ScanFilterHandle scan_filter_handle() const override { + return _predicate->scan_filter_handle(); + } + uint16_t evaluate(MutableColumns& block, uint16_t* sel, uint16_t selected_size) const override; void evaluate_and(MutableColumns& block, uint16_t* sel, uint16_t selected_size, bool* flags) const override; @@ -176,6 +189,22 @@ class MutilColumnBlockPredicate : public BlockColumnPredicate { size_t num_of_column_predicate() const { return _block_column_predicate_vec.size(); } + bool has_scan_filter() const override { + for (const auto& child_block_predicate : _block_column_predicate_vec) { + if (child_block_predicate->has_scan_filter()) { + return true; + } + } + return false; + } + + void record_scan_filter(ScanFilterStage stage, int64_t input_rows, + int64_t output_rows) const override { + for (const auto& child_block_predicate : _block_column_predicate_vec) { + child_block_predicate->record_scan_filter(stage, input_rows, output_rows); + } + } + void get_all_column_ids(std::set& column_id_set) const override { for (auto& child_block_predicate : _block_column_predicate_vec) { child_block_predicate->get_all_column_ids(column_id_set); @@ -235,10 +264,19 @@ class AndBlockColumnPredicate : public MutilColumnBlockPredicate { bool evaluate_and(const segment_v2::ZoneMap& zone_map) const override; + bool evaluate_and_with_scan_filter(const segment_v2::ZoneMap& zone_map, ScanFilterStage stage, + int64_t input_rows) const; + bool evaluate_and(const segment_v2::BloomFilter* bf) const override; + bool evaluate_and_with_scan_filter(const segment_v2::BloomFilter* bf, ScanFilterStage stage, + int64_t input_rows) const; + bool evaluate_and(const StringRef* dict_words, const size_t dict_num) const override; + bool evaluate_and_with_scan_filter(const StringRef* dict_words, const size_t dict_num, + ScanFilterStage stage, int64_t input_rows) const; + bool evaluate_and(ParquetPredicate::ColumnStat* statistic) const override { for (auto& block_column_predicate : _block_column_predicate_vec) { if (!block_column_predicate->evaluate_and(statistic)) { diff --git a/be/src/storage/predicate/column_predicate.h b/be/src/storage/predicate/column_predicate.h index 9e8d9e1f921ae2..b0dc409b381328 100644 --- a/be/src/storage/predicate/column_predicate.h +++ b/be/src/storage/predicate/column_predicate.h @@ -27,6 +27,7 @@ #include "exprs/runtime_filter_expr.h" #include "format/parquet/parquet_predicate.h" #include "runtime/runtime_profile.h" +#include "runtime/scan_filter_profile.h" #include "storage/index/bloom_filter/bloom_filter.h" #include "storage/index/inverted/inverted_index_iterator.h" #include "storage/index/zone_map/zone_map_index.h" @@ -317,6 +318,10 @@ class ColumnPredicate : public std::enable_shared_from_this { bool opposite() const { return _opposite; } + void attach_scan_filter(ScanFilterHandle handle) { _scan_filter_handle = std::move(handle); } + + const ScanFilterHandle& scan_filter_handle() const { return _scan_filter_handle; } + void attach_profile_counter( int filter_id, std::shared_ptr predicate_filtered_rows_counter, std::shared_ptr predicate_input_rows_counter, @@ -426,6 +431,8 @@ class ColumnPredicate : public std::enable_shared_from_this { // reset_judge_selectivity is used to reset these variables. mutable RuntimeFilterSelectivity _rf_selectivity; + ScanFilterHandle _scan_filter_handle; + std::shared_ptr _predicate_filtered_rows_counter = std::make_shared(TUnit::UNIT, 0); std::shared_ptr _predicate_input_rows_counter = diff --git a/be/src/storage/rowset/beta_rowset_reader.cpp b/be/src/storage/rowset/beta_rowset_reader.cpp index 717a555264a0d1..916aa2930aa741 100644 --- a/be/src/storage/rowset/beta_rowset_reader.cpp +++ b/be/src/storage/rowset/beta_rowset_reader.cpp @@ -218,6 +218,8 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context _read_options.io_ctx.reader_type = _read_context->reader_type; _read_options.io_ctx.file_cache_stats = &_stats->file_cache_stats; _read_options.runtime_state = _read_context->runtime_state; + _read_options.scan_filter_profile = _read_context->scan_filter_profile; + _read_options.key_range_scan_filter = _read_context->key_range_scan_filter; _read_options.output_columns = _read_context->output_columns; _read_options.io_ctx.reader_type = _read_context->reader_type; _read_options.io_ctx.is_disposable = _read_context->reader_type != ReaderType::READER_QUERY; diff --git a/be/src/storage/rowset/rowset_reader_context.h b/be/src/storage/rowset/rowset_reader_context.h index d82488424a64c8..005ad6c4674eb8 100644 --- a/be/src/storage/rowset/rowset_reader_context.h +++ b/be/src/storage/rowset/rowset_reader_context.h @@ -35,6 +35,7 @@ namespace doris { class RowCursor; class DeleteBitmap; class DeleteHandler; +class ScanFilterProfile; class TabletSchema; struct RowsetReaderContext { @@ -69,6 +70,8 @@ struct RowsetReaderContext { const DeleteHandler* delete_handler = nullptr; OlapReaderStatistics* stats = nullptr; RuntimeState* runtime_state = nullptr; + std::shared_ptr scan_filter_profile; + ScanFilterHandle key_range_scan_filter; VExprContextSPtrs common_expr_ctxs_push_down; bool use_page_cache = false; int sequence_id_idx = -1; diff --git a/be/src/storage/segment/column_reader.cpp b/be/src/storage/segment/column_reader.cpp index de526a432b89d1..8933253a81cf40 100644 --- a/be/src/storage/segment/column_reader.cpp +++ b/be/src/storage/segment/column_reader.cpp @@ -89,6 +89,17 @@ inline bool read_as_string(PrimitiveType type) { type == PrimitiveType::TYPE_BITMAP || type == PrimitiveType::TYPE_FIXED_LENGTH_OBJECT; } +namespace { + +int64_t row_ranges_intersection_count(const RowRanges& row_ranges, ordinal_t from, ordinal_t to) { + RowRanges page_row_ranges(RowRanges::create_single(from, to)); + RowRanges intersected_row_ranges; + RowRanges::ranges_intersection(row_ranges, page_row_ranges, &intersected_row_ranges); + return cast_set(intersected_row_ranges.count()); +} + +} // namespace + Status ColumnReader::create_array(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, const io::FileReaderSPtr& file_reader, std::shared_ptr* reader) { @@ -438,9 +449,11 @@ Status ColumnReader::get_row_ranges_by_zone_map( const std::vector>* delete_predicates, RowRanges* row_ranges, const ColumnIteratorOptions& iter_opts) { std::vector page_indexes; - RETURN_IF_ERROR( - _get_filtered_pages(col_predicates, delete_predicates, &page_indexes, iter_opts)); - RETURN_IF_ERROR(_calculate_row_ranges(page_indexes, row_ranges, iter_opts)); + RETURN_IF_ERROR(_get_filtered_pages(col_predicates, delete_predicates, *row_ranges, + &page_indexes, iter_opts)); + RowRanges zone_map_row_ranges; + RETURN_IF_ERROR(_calculate_row_ranges(page_indexes, &zone_map_row_ranges, iter_opts)); + RowRanges::ranges_intersection(*row_ranges, zone_map_row_ranges, row_ranges); return Status::OK(); } @@ -515,18 +528,43 @@ bool ColumnReader::_zone_map_match_condition(const ZoneMap& zone_map, Status ColumnReader::_get_filtered_pages( const AndBlockColumnPredicate* col_predicates, const std::vector>* delete_predicates, - std::vector* page_indexes, const ColumnIteratorOptions& iter_opts) { + const RowRanges& input_row_ranges, std::vector* page_indexes, + const ColumnIteratorOptions& iter_opts) { RETURN_IF_ERROR(_load_zone_map_index(_use_index_page_cache, _opts.kept_in_memory, iter_opts)); + const bool collect_scan_filter_stats = col_predicates->has_scan_filter(); + if (collect_scan_filter_stats) { + RETURN_IF_ERROR( + _load_ordinal_index(_use_index_page_cache, _opts.kept_in_memory, iter_opts)); + } const std::vector& zone_maps = _zone_map_index->page_zone_maps(); - size_t page_size = _zone_map_index->num_pages(); - for (size_t i = 0; i < page_size; ++i) { + int page_size = cast_set(_zone_map_index->num_pages()); + for (int i = 0; i < page_size; ++i) { + int64_t page_input_rows = 0; + if (collect_scan_filter_stats) { + const ordinal_t page_first_id = _ordinal_index->get_first_ordinal(i); + const ordinal_t page_last_id = _ordinal_index->get_last_ordinal(i) + 1; + page_input_rows = + row_ranges_intersection_count(input_row_ranges, page_first_id, page_last_id); + if (page_input_rows == 0) { + continue; + } + } if (zone_maps[i].pass_all()) { + if (collect_scan_filter_stats) { + col_predicates->record_scan_filter(ScanFilterStage::INDEX_ZONE_MAP, page_input_rows, + page_input_rows); + } page_indexes->push_back(cast_set(i)); } else { segment_v2::ZoneMap zone_map; RETURN_IF_ERROR(ZoneMap::from_proto(zone_maps[i], _data_type, zone_map)); - if (_zone_map_match_condition(zone_map, col_predicates)) { + const bool matched = + collect_scan_filter_stats + ? col_predicates->evaluate_and_with_scan_filter( + zone_map, ScanFilterStage::INDEX_ZONE_MAP, page_input_rows) + : col_predicates->evaluate_and(zone_map); + if (matched) { bool should_read = true; if (delete_predicates != nullptr) { for (auto del_pred : *delete_predicates) { @@ -575,6 +613,7 @@ Status ColumnReader::get_row_ranges_by_bloom_filter(const AndBlockColumnPredicat std::unique_ptr bf_iter; RETURN_IF_ERROR(_bloom_filter_index->new_iterator(&bf_iter, iter_opts.stats)); size_t range_size = row_ranges->range_size(); + const bool collect_scan_filter_stats = col_predicates->has_scan_filter(); // get covered page ids std::set page_ids; for (int i = 0; i < range_size; ++i) { @@ -591,7 +630,19 @@ Status ColumnReader::get_row_ranges_by_bloom_filter(const AndBlockColumnPredicat for (auto& pid : page_ids) { std::unique_ptr bf; RETURN_IF_ERROR(bf_iter->read_bloom_filter(pid, &bf)); - if (col_predicates->evaluate_and(bf.get())) { + if (collect_scan_filter_stats) { + const ordinal_t page_first_id = _ordinal_index->get_first_ordinal(pid); + const ordinal_t page_last_id = _ordinal_index->get_last_ordinal(pid) + 1; + const int64_t page_input_rows = + row_ranges_intersection_count(*row_ranges, page_first_id, page_last_id); + if (page_input_rows == 0) { + continue; + } + if (col_predicates->evaluate_and_with_scan_filter( + bf.get(), ScanFilterStage::INDEX_BLOOM_FILTER, page_input_rows)) { + bf_row_ranges.add(RowRange(page_first_id, page_last_id)); + } + } else if (col_predicates->evaluate_and(bf.get())) { bf_row_ranges.add(RowRange(_ordinal_index->get_first_ordinal(pid), _ordinal_index->get_last_ordinal(pid) + 1)); } @@ -2503,7 +2554,13 @@ Status FileColumnIterator::get_row_ranges_by_dict(const AndBlockColumnPredicate* CHECK_NOTNULL(_dict_decoder); } - if (!col_predicates->evaluate_and(_dict_word_info.get(), _dict_decoder->count())) { + const bool matched = + col_predicates->has_scan_filter() + ? col_predicates->evaluate_and_with_scan_filter( + _dict_word_info.get(), _dict_decoder->count(), + ScanFilterStage::INDEX_DICT, cast_set(row_ranges->count())) + : col_predicates->evaluate_and(_dict_word_info.get(), _dict_decoder->count()); + if (!matched) { row_ranges->clear(); } return Status::OK(); diff --git a/be/src/storage/segment/column_reader.h b/be/src/storage/segment/column_reader.h index 64c60008dd52d2..3e5f2d06acf9b8 100644 --- a/be/src/storage/segment/column_reader.h +++ b/be/src/storage/segment/column_reader.h @@ -261,7 +261,8 @@ class ColumnReader : public MetadataAdder, Status _get_filtered_pages( const AndBlockColumnPredicate* col_predicates, const std::vector>* delete_predicates, - std::vector* page_indexes, const ColumnIteratorOptions& iter_opts); + const RowRanges& input_row_ranges, std::vector* page_indexes, + const ColumnIteratorOptions& iter_opts); Status _calculate_row_ranges(const std::vector& page_indexes, RowRanges* row_ranges, const ColumnIteratorOptions& iter_opts); diff --git a/be/src/storage/segment/segment_iterator.cpp b/be/src/storage/segment/segment_iterator.cpp index dc6930777f5508..6a377575e21e59 100644 --- a/be/src/storage/segment/segment_iterator.cpp +++ b/be/src/storage/segment/segment_iterator.cpp @@ -830,7 +830,13 @@ Status SegmentIterator::_get_row_ranges_by_keys() { } size_t pre_size = _row_bitmap.cardinality(); _row_bitmap &= RowRanges::ranges_to_roaring(result_ranges); - _opts.stats->rows_key_range_filtered += (pre_size - _row_bitmap.cardinality()); + const size_t post_size = _row_bitmap.cardinality(); + _opts.stats->rows_key_range_filtered += (pre_size - post_size); + if (_opts.key_range_scan_filter) { + _opts.key_range_scan_filter.stats->record(ScanFilterStage::KEY_RANGE, + static_cast(pre_size), + static_cast(post_size)); + } return Status::OK(); } @@ -1190,10 +1196,8 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row continue; } // get row ranges by bf index of this column, - RowRanges column_bf_row_ranges = RowRanges::create_single(num_rows()); RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_bloom_filter( - _opts.col_id_to_predicates.at(cid).get(), &column_bf_row_ranges)); - RowRanges::ranges_intersection(bf_row_ranges, column_bf_row_ranges, &bf_row_ranges); + _opts.col_id_to_predicates.at(cid).get(), &bf_row_ranges)); } pre_size = condition_row_ranges->count(); @@ -1217,16 +1221,12 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row continue; } // get row ranges by zone map of this column, - RowRanges column_row_ranges = RowRanges::create_single(num_rows()); RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_zone_map( _opts.col_id_to_predicates.at(cid).get(), _opts.del_predicates_for_zone_map.count(cid) > 0 ? &(_opts.del_predicates_for_zone_map.at(cid)) : nullptr, - &column_row_ranges)); - // intersect different columns's row ranges to get final row ranges by zone map - RowRanges::ranges_intersection(zone_map_row_ranges, column_row_ranges, - &zone_map_row_ranges); + &zone_map_row_ranges)); } pre_size = condition_row_ranges->count(); @@ -1331,7 +1331,14 @@ Status SegmentIterator::_apply_index_expr() { !_opts.runtime_state->query_options().__isset.enable_ann_index_result_cache || _opts.runtime_state->query_options().enable_ann_index_result_cache; + std::unique_ptr scan_filter_profile_bitmap; for (const auto& expr_ctx : _common_expr_ctxs_push_down) { + const bool collect_scan_filter_stats = static_cast(expr_ctx->scan_filter_handle()); + if (collect_scan_filter_stats && scan_filter_profile_bitmap == nullptr) { + scan_filter_profile_bitmap = std::make_unique(_row_bitmap); + } + const size_t origin_rows = + collect_scan_filter_stats ? scan_filter_profile_bitmap->cardinality() : 0; if (Status st = expr_ctx->evaluate_inverted_index(num_rows()); !st.ok()) { if (_downgrade_without_index(st) || st.code() == ErrorCode::NOT_IMPLEMENTED_ERROR) { continue; @@ -1343,6 +1350,17 @@ Status SegmentIterator::_apply_index_expr() { return st; } } + if (collect_scan_filter_stats && expr_ctx->get_index_context() != nullptr && + expr_ctx->root() != nullptr) { + const auto* result = expr_ctx->get_index_context()->get_index_result_for_expr( + expr_ctx->root().get()); + if (result != nullptr && result->get_data_bitmap() != nullptr) { + *scan_filter_profile_bitmap &= *result->get_data_bitmap(); + expr_ctx->scan_filter_handle().stats->record( + ScanFilterStage::INDEX_INVERTED, static_cast(origin_rows), + static_cast(scan_filter_profile_bitmap->cardinality())); + } + } } // Evaluate inverted index for virtual column MATCH expressions (projections). @@ -1374,6 +1392,11 @@ Status SegmentIterator::_apply_index_expr() { _common_expr_to_slotref_map, _row_bitmap, ann_index_stats, enable_ann_index_result_cache, &ann_range_search_executed)); if (ann_range_search_executed) { + if (expr_ctx->scan_filter_handle()) { + expr_ctx->scan_filter_handle().stats->record( + ScanFilterStage::INDEX_ANN, static_cast(origin_rows), + static_cast(_row_bitmap.cardinality())); + } _opts.stats->ann_index_range_search_cnt++; } _opts.stats->rows_ann_index_range_filtered += (origin_rows - _row_bitmap.cardinality()); @@ -1453,6 +1476,7 @@ Status SegmentIterator::_apply_inverted_index_on_column_predicate( } else { bool need_remaining_after_evaluate = _column_has_fulltext_index(pred->column_id()) && PredicateTypeTraits::is_equal_or_list(pred->type()); + const size_t rows_before = _row_bitmap.cardinality(); Status res = pred->evaluate(_storage_name_and_type[pred->column_id()], _index_iterators[pred->column_id()].get(), num_rows(), &_row_bitmap); @@ -1466,6 +1490,12 @@ Status SegmentIterator::_apply_inverted_index_on_column_predicate( << ", error msg: " << res; return res; } + const size_t rows_after = _row_bitmap.cardinality(); + if (pred->scan_filter_handle()) { + pred->scan_filter_handle().stats->record(ScanFilterStage::INDEX_INVERTED, + static_cast(rows_before), + static_cast(rows_after)); + } if (_row_bitmap.isEmpty()) { // all rows have been pruned, no need to process further predicates @@ -2616,6 +2646,10 @@ uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_ all_pred_always_true = false; } else { pred->update_filter_info(0, 0, selected_size); + if (pred->scan_filter_handle()) { + pred->scan_filter_handle().stats->record(ScanFilterStage::EXEC_VECTOR, + selected_size, selected_size); + } } } @@ -2633,6 +2667,8 @@ uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_ _ret_flags.resize(original_size); DCHECK(!_pre_eval_block_predicate.empty()); bool is_first = true; + int64_t current_selected_rows = original_size; + const bool collect_scan_filter_stats = _opts.scan_filter_profile != nullptr; for (auto& pred : _pre_eval_block_predicate) { if (pred->always_true()) { continue; @@ -2645,6 +2681,15 @@ uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_ } else { pred->evaluate_and_vec(*column, original_size, (bool*)_ret_flags.data()); } + if (collect_scan_filter_stats) { + const int64_t output_rows = + std::count(_ret_flags.begin(), _ret_flags.begin() + original_size, 1); + if (pred->scan_filter_handle()) { + pred->scan_filter_handle().stats->record(ScanFilterStage::EXEC_VECTOR, + current_selected_rows, output_rows); + } + current_selected_rows = output_rows; + } } uint16_t new_size = 0; @@ -2694,7 +2739,12 @@ uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro for (auto predicate : _short_cir_eval_predicate) { auto column_id = predicate->column_id(); auto& short_cir_column = _current_return_columns[column_id]; + const uint16_t rows_before = selected_size; selected_size = predicate->evaluate(*short_cir_column, vec_sel_rowid_idx, selected_size); + if (predicate->scan_filter_handle()) { + predicate->scan_filter_handle().stats->record(ScanFilterStage::EXEC_SHORT_CIRCUIT, + rows_before, selected_size); + } } _opts.stats->short_circuit_cond_input_rows += original_size; @@ -3222,7 +3272,8 @@ Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& IColumn::Filter filter; RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( - _common_expr_ctxs_push_down, block, _columns_to_filter, prev_columns, filter)); + _common_expr_ctxs_push_down, block, _columns_to_filter, prev_columns, filter, + ScanFilterStage::EXEC_COMMON_EXPR)); selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter); _opts.stats->rows_expr_cond_filtered += original_size - selected_size; diff --git a/be/src/storage/segment/segment_iterator.h b/be/src/storage/segment/segment_iterator.h index c7faf3fdb51d17..e2907917330520 100644 --- a/be/src/storage/segment/segment_iterator.h +++ b/be/src/storage/segment/segment_iterator.h @@ -133,8 +133,10 @@ class SegmentIterator : public RowwiseIterator { int64_t tablet_id() const { return _tablet_id; } void update_profile(RuntimeProfile* profile) override { - _update_profile(profile, _short_cir_eval_predicate, "ShortCircuitPredicates"); - _update_profile(profile, _pre_eval_block_predicate, "PreEvaluatePredicates"); + if (_opts.scan_filter_profile == nullptr) { + _update_profile(profile, _short_cir_eval_predicate, "ShortCircuitPredicates"); + _update_profile(profile, _pre_eval_block_predicate, "PreEvaluatePredicates"); + } if (_opts.delete_condition_predicates != nullptr) { std::set> delete_predicate_set; diff --git a/be/src/storage/tablet/tablet_reader.cpp b/be/src/storage/tablet/tablet_reader.cpp index 635fc9b3d7daba..8eb950da05ef78 100644 --- a/be/src/storage/tablet/tablet_reader.cpp +++ b/be/src/storage/tablet/tablet_reader.cpp @@ -232,6 +232,8 @@ Status TabletReader::_init_params(const ReaderParams& read_params) { _tablet = read_params.tablet; _tablet_schema = read_params.tablet_schema; _reader_context.runtime_state = read_params.runtime_state; + _reader_context.scan_filter_profile = read_params.scan_filter_profile; + _reader_context.key_range_scan_filter = read_params.key_range_scan_filter; _reader_context.target_cast_type_for_variants = read_params.target_cast_type_for_variants; RETURN_IF_ERROR(_init_conditions_param(read_params)); @@ -505,8 +507,10 @@ std::shared_ptr TabletReader::_parse_to_predicate( return nullptr; } const TabletColumn& column = materialize_column(_tablet_schema->column(index)); - return create_column_predicate(index, std::make_shared(function_filter), - column.type(), &column); + auto predicate = create_column_predicate( + index, std::make_shared(function_filter), column.type(), &column); + predicate->attach_scan_filter(function_filter._scan_filter_handle); + return predicate; } Status TabletReader::_init_delete_condition(const ReaderParams& read_params) { diff --git a/be/src/storage/tablet/tablet_reader.h b/be/src/storage/tablet/tablet_reader.h index 64e1a0f2897637..f2762478239b04 100644 --- a/be/src/storage/tablet/tablet_reader.h +++ b/be/src/storage/tablet/tablet_reader.h @@ -57,6 +57,7 @@ class ColumnPredicate; class DeleteBitmap; class HybridSetBase; class RuntimeProfile; +class ScanFilterProfile; class VCollectIterator; class Block; @@ -161,6 +162,8 @@ class TabletReader { std::set output_columns; RuntimeProfile* profile = nullptr; RuntimeState* runtime_state = nullptr; + std::shared_ptr scan_filter_profile; + ScanFilterHandle key_range_scan_filter; // use only in vec exec engine std::vector* origin_return_columns = nullptr;