2121#include < gen_cpp/Exprs_types.h>
2222#include < gen_cpp/Metrics_types.h>
2323
24+ #include < algorithm>
2425#include < cstdint>
2526#include < memory>
2627
@@ -77,6 +78,14 @@ Status ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* stat
7778 size_t conjuncts_before = _conjuncts.size ();
7879 RETURN_IF_ERROR (_helper.try_append_late_arrival_runtime_filter (state, _parent->row_descriptor (),
7980 arrived_rf_num, _conjuncts));
81+ if (_scan_filter_profile != nullptr ) {
82+ for (size_t i = conjuncts_before; i < _conjuncts.size (); ++i) {
83+ if (_conjuncts[i]->root () != nullptr && !_conjuncts[i]->scan_filter_handle ()) {
84+ _conjuncts[i]->attach_scan_filter (
85+ _register_scan_filter (_conjuncts[i]->root (), nullptr ));
86+ }
87+ }
88+ }
8089 if (state->enable_adjust_conjunct_order_by_cost ()) {
8190 std::ranges::sort (_conjuncts, [](const auto & a, const auto & b) {
8291 return a->execute_cost () < b->execute_cost ();
@@ -134,6 +143,12 @@ Status ScanLocalStateBase::_do_partition_pruning_by_rf() {
134143 return Status::OK ();
135144}
136145
146+ ScanRuntimeFilterPartitionPruningStats ScanLocalStateBase::_runtime_filter_partition_pruning_stats ()
147+ const {
148+ return {.total_partitions = _total_partitions_rf_counter->value (),
149+ .pruned_partitions = _partitions_pruned_by_rf_counter->value ()};
150+ }
151+
137152int ScanLocalStateBase::max_scanners_concurrency (RuntimeState* state) const {
138153 // For select * from table limit 10; should just use one thread.
139154 if (should_run_serial ()) {
@@ -180,6 +195,9 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info)
180195 SCOPED_TIMER (_init_timer);
181196 auto & p = _parent->cast <typename Derived::Parent>();
182197 _max_pushdown_conditions_per_column = p._max_pushdown_conditions_per_column ;
198+ if (state->enable_profile () && state->profile_level () >= 1 ) {
199+ _scan_filter_profile = std::make_shared<ScanFilterProfile>();
200+ }
183201 RETURN_IF_ERROR (_helper.init (state, p.is_serial_operator (), p.node_id (), p.operator_id (),
184202 _filter_dependencies, p.get_name () + " _FILTER_DEPENDENCY" ));
185203 RETURN_IF_ERROR (_init_profile ());
@@ -207,6 +225,31 @@ static std::string predicates_to_string(
207225 return fmt::to_string (debug_string_buffer);
208226}
209227
228+ ScanFilterHandle ScanLocalStateBase::_register_scan_filter (const VExprSPtr& root,
229+ const SlotDescriptor* slot) {
230+ if (_scan_filter_profile == nullptr || root == nullptr ) {
231+ return {};
232+ }
233+
234+ ScanFilterDesc desc;
235+ desc.kind = ScanFilterKind::NORMAL ;
236+ if (root->is_rf_wrapper ()) {
237+ desc.kind = ScanFilterKind::RUNTIME_FILTER ;
238+ desc.runtime_filter_id = assert_cast<RuntimeFilterExpr*>(root.get ())->filter_id ();
239+ } else if (root->is_topn_filter ()) {
240+ desc.kind = ScanFilterKind::TOPN_FILTER ;
241+ desc.topn_filter_source_node_id = assert_cast<VTopNPred*>(root.get ())->source_node_id ();
242+ }
243+ if (slot != nullptr ) {
244+ desc.slot_id = slot->id ();
245+ desc.column_name = slot->col_name ();
246+ desc.column_id = _parent->intermediate_row_desc ().get_column_id (slot->id ());
247+ }
248+ desc.debug_string = root->debug_string ();
249+ desc.compact_info = desc.debug_string ;
250+ return _scan_filter_profile->register_filter (std::move (desc));
251+ }
252+
210253template <typename Derived>
211254Status ScanLocalState<Derived>::open(RuntimeState* state) {
212255 SCOPED_TIMER (exec_time_counter ());
@@ -251,7 +294,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
251294
252295 RETURN_IF_ERROR (_process_conjuncts (state));
253296
254- if (state->enable_profile ()) {
297+ if (state->enable_profile () && _scan_filter_profile == nullptr ) {
255298 custom_profile ()->add_info_string (" PushDownPredicates" ,
256299 predicates_to_string (_slot_id_to_predicates));
257300 }
@@ -354,6 +397,9 @@ Status ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
354397 RETURN_IF_ERROR (_normalize_predicate (conjunct.get (), conjunct->root (), new_root));
355398 if (new_root) {
356399 conjunct->set_root (new_root);
400+ if (!conjunct->scan_filter_handle ()) {
401+ conjunct->attach_scan_filter (_register_scan_filter (conjunct->root (), nullptr ));
402+ }
357403 if (_should_push_down_common_expr (conjunct->root ())) {
358404 _common_expr_ctxs_push_down.emplace_back (conjunct);
359405 it = _conjuncts.erase (it);
@@ -369,7 +415,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
369415 ++it;
370416 }
371417
372- if (state->enable_profile ()) {
418+ if (state->enable_profile () && _scan_filter_profile == nullptr ) {
373419 std::string message;
374420 for (auto & conjunct : _conjuncts) {
375421 if (conjunct->root ()) {
@@ -424,14 +470,16 @@ Status ScanLocalState<Derived>::_normalize_predicate(VExprContext* context, cons
424470 }
425471 if (_is_predicate_acting_on_slot (expr_root->children (), &slot, &range)) {
426472 Status status = Status::OK ();
473+ auto & slot_predicates = _slot_id_to_predicates[slot->id ()];
474+ const size_t predicates_before = slot_predicates.size ();
427475 std::visit (
428476 [&](auto & value_range) {
429477 auto expr = root->is_rf_wrapper () ? root->get_impl () : root;
430478 {
431479 Defer attach_defer = [&]() {
432480 if (pdt != PushDownType::UNACCEPTABLE && root->is_rf_wrapper ()) {
433481 auto * rf_expr = assert_cast<RuntimeFilterExpr*>(root.get ());
434- _slot_id_to_predicates[slot-> id ()] .back ()->attach_profile_counter (
482+ slot_predicates .back ()->attach_profile_counter (
435483 rf_expr->filter_id (),
436484 rf_expr->predicate_filtered_rows_counter (),
437485 rf_expr->predicate_input_rows_counter (),
@@ -442,43 +490,36 @@ Status ScanLocalState<Derived>::_normalize_predicate(VExprContext* context, cons
442490 switch (expr->node_type ()) {
443491 case TExprNodeType::IN_PRED :
444492 RETURN_IF_PUSH_DOWN (
445- _normalize_in_predicate (context, expr, slot,
446- _slot_id_to_predicates[slot->id ()],
493+ _normalize_in_predicate (context, expr, slot, slot_predicates,
447494 value_range, &pdt),
448495 status);
449496 break ;
450497 case TExprNodeType::BINARY_PRED :
451498 RETURN_IF_PUSH_DOWN (
452499 _normalize_binary_predicate (context, expr, slot,
453- _slot_id_to_predicates[slot->id ()],
454- value_range, &pdt),
500+ slot_predicates, value_range, &pdt),
455501 status);
456502 break ;
457503 case TExprNodeType::FUNCTION_CALL :
458504 if (expr->is_topn_filter ()) {
459- RETURN_IF_PUSH_DOWN (
460- _normalize_topn_filter (context, expr, slot,
461- _slot_id_to_predicates[slot->id ()],
462- &pdt),
463- status);
505+ RETURN_IF_PUSH_DOWN (_normalize_topn_filter (context, expr, slot,
506+ slot_predicates, &pdt),
507+ status);
464508 } else {
465509 RETURN_IF_PUSH_DOWN (_normalize_is_null_predicate (
466- context, expr, slot,
467- _slot_id_to_predicates[slot->id ()],
510+ context, expr, slot, slot_predicates,
468511 value_range, &pdt),
469512 status);
470513 }
471514 break ;
472515 case TExprNodeType::BITMAP_PRED :
473- RETURN_IF_PUSH_DOWN (_normalize_bitmap_filter (
474- context, root, slot,
475- _slot_id_to_predicates[slot->id ()], &pdt),
516+ RETURN_IF_PUSH_DOWN (_normalize_bitmap_filter (context, root, slot,
517+ slot_predicates, &pdt),
476518 status);
477519 break ;
478520 case TExprNodeType::BLOOM_PRED :
479- RETURN_IF_PUSH_DOWN (_normalize_bloom_filter (
480- context, root, slot,
481- _slot_id_to_predicates[slot->id ()], &pdt),
521+ RETURN_IF_PUSH_DOWN (_normalize_bloom_filter (context, root, slot,
522+ slot_predicates, &pdt),
482523 status);
483524 break ;
484525 default :
@@ -493,6 +534,18 @@ Status ScanLocalState<Derived>::_normalize_predicate(VExprContext* context, cons
493534 },
494535 *range);
495536 RETURN_IF_ERROR (status);
537+ if (pdt != PushDownType::UNACCEPTABLE ) {
538+ auto handle = context->scan_filter_handle ();
539+ if (!handle) {
540+ handle = _register_scan_filter (root, slot);
541+ context->attach_scan_filter (handle);
542+ }
543+ if (handle) {
544+ for (size_t i = predicates_before; i < slot_predicates.size (); ++i) {
545+ slot_predicates[i]->attach_scan_filter (handle);
546+ }
547+ }
548+ }
496549 }
497550 if (pdt == PushDownType::ACCEPTABLE && slotref != nullptr &&
498551 slotref->data_type ()->get_primitive_type () == PrimitiveType::TYPE_VARIANT ) {
@@ -614,7 +667,12 @@ Status ScanLocalStateBase::_normalize_function_filters(VExprContext* expr_ctx, S
614667 expr_ctx, &val, &fn_ctx, temp_pdt));
615668 if (temp_pdt != PushDownType::UNACCEPTABLE ) {
616669 std::string col = slot->col_name ();
617- _push_down_functions.emplace_back (opposite, col, fn_ctx, val);
670+ auto handle = expr_ctx->scan_filter_handle ();
671+ if (!handle) {
672+ handle = _register_scan_filter (expr_ctx->root (), slot);
673+ expr_ctx->attach_scan_filter (handle);
674+ }
675+ _push_down_functions.emplace_back (opposite, col, fn_ctx, val, handle);
618676 *pdt = temp_pdt;
619677 }
620678 }
@@ -1343,7 +1401,12 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
13431401 std::list<std::shared_ptr<ScannerDelegate>> {}.swap (_scanners);
13441402 COUNTER_SET (_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time ());
13451403 COUNTER_SET (_wait_for_rf_timer, rf_time);
1346- _helper.collect_realtime_profile (custom_profile ());
1404+ _helper.collect_realtime_profile (custom_profile (), _scan_filter_profile.get ());
1405+ if (_scan_filter_profile != nullptr ) {
1406+ _scan_filter_profile->set_runtime_filter_partition_pruning_stats (
1407+ _runtime_filter_partition_pruning_stats ());
1408+ _scan_filter_profile->materialize (custom_profile (), state->profile_level ());
1409+ }
13471410 return PipelineXLocalState<>::close (state);
13481411}
13491412
0 commit comments