2121#include < gen_cpp/Exprs_types.h>
2222#include < gen_cpp/Metrics_types.h>
2323
24+ #include < algorithm>
2425#include < cstdint>
2526#include < memory>
2627
@@ -77,6 +78,14 @@ Status ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* stat
7778 size_t conjuncts_before = _conjuncts.size ();
7879 RETURN_IF_ERROR (_helper.try_append_late_arrival_runtime_filter (state, _parent->row_descriptor (),
7980 arrived_rf_num, _conjuncts));
81+ if (_scan_filter_profile != nullptr ) {
82+ for (size_t i = conjuncts_before; i < _conjuncts.size (); ++i) {
83+ if (_conjuncts[i]->root () != nullptr && !_conjuncts[i]->scan_filter_handle ()) {
84+ _conjuncts[i]->attach_scan_filter (
85+ _register_scan_filter (_conjuncts[i]->root (), nullptr ));
86+ }
87+ }
88+ }
8089 if (state->enable_adjust_conjunct_order_by_cost ()) {
8190 std::ranges::sort (_conjuncts, [](const auto & a, const auto & b) {
8291 return a->execute_cost () < b->execute_cost ();
@@ -134,6 +143,12 @@ Status ScanLocalStateBase::_do_partition_pruning_by_rf() {
134143 return Status::OK ();
135144}
136145
146+ ScanRuntimeFilterPartitionPruningStats ScanLocalStateBase::_runtime_filter_partition_pruning_stats ()
147+ const {
148+ return {.total_partitions = _total_partitions_rf_counter->value (),
149+ .pruned_partitions = _partitions_pruned_by_rf_counter->value ()};
150+ }
151+
137152int ScanLocalStateBase::max_scanners_concurrency (RuntimeState* state) const {
138153 // For select * from table limit 10; should just use one thread.
139154 if (should_run_serial ()) {
@@ -180,6 +195,9 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info)
180195 SCOPED_TIMER (_init_timer);
181196 auto & p = _parent->cast <typename Derived::Parent>();
182197 _max_pushdown_conditions_per_column = p._max_pushdown_conditions_per_column ;
198+ if (state->enable_profile () && state->profile_level () >= 1 ) {
199+ _scan_filter_profile = std::make_shared<ScanFilterProfile>();
200+ }
183201 RETURN_IF_ERROR (_helper.init (state, p.is_serial_operator (), p.node_id (), p.operator_id (),
184202 _filter_dependencies, p.get_name () + " _FILTER_DEPENDENCY" ));
185203 RETURN_IF_ERROR (_init_profile ());
@@ -207,6 +225,30 @@ static std::string predicates_to_string(
207225 return fmt::to_string (debug_string_buffer);
208226}
209227
228+ ScanFilterHandle ScanLocalStateBase::_register_scan_filter (const VExprSPtr& root,
229+ const SlotDescriptor* slot) {
230+ if (_scan_filter_profile == nullptr || root == nullptr ) {
231+ return {};
232+ }
233+
234+ ScanFilterDesc desc;
235+ desc.kind = ScanFilterKind::NORMAL;
236+ if (root->is_rf_wrapper ()) {
237+ desc.kind = ScanFilterKind::RUNTIME_FILTER;
238+ desc.runtime_filter_id = assert_cast<RuntimeFilterExpr*>(root.get ())->filter_id ();
239+ } else if (root->is_topn_filter ()) {
240+ desc.kind = ScanFilterKind::TOPN_FILTER;
241+ }
242+ if (slot != nullptr ) {
243+ desc.slot_id = slot->id ();
244+ desc.column_name = slot->col_name ();
245+ desc.column_id = _parent->intermediate_row_desc ().get_column_id (slot->id ());
246+ }
247+ desc.debug_string = root->debug_string ();
248+ desc.compact_info = desc.debug_string ;
249+ return _scan_filter_profile->register_filter (std::move (desc));
250+ }
251+
210252template <typename Derived>
211253Status ScanLocalState<Derived>::open(RuntimeState* state) {
212254 SCOPED_TIMER (exec_time_counter ());
@@ -251,7 +293,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
251293
252294 RETURN_IF_ERROR (_process_conjuncts (state));
253295
254- if (state->enable_profile ()) {
296+ if (state->enable_profile () && _scan_filter_profile == nullptr ) {
255297 custom_profile ()->add_info_string (" PushDownPredicates" ,
256298 predicates_to_string (_slot_id_to_predicates));
257299 }
@@ -354,6 +396,9 @@ Status ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
354396 RETURN_IF_ERROR (_normalize_predicate (conjunct.get (), conjunct->root (), new_root));
355397 if (new_root) {
356398 conjunct->set_root (new_root);
399+ if (!conjunct->scan_filter_handle ()) {
400+ conjunct->attach_scan_filter (_register_scan_filter (conjunct->root (), nullptr ));
401+ }
357402 if (_should_push_down_common_expr (conjunct->root ())) {
358403 _common_expr_ctxs_push_down.emplace_back (conjunct);
359404 it = _conjuncts.erase (it);
@@ -369,7 +414,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
369414 ++it;
370415 }
371416
372- if (state->enable_profile ()) {
417+ if (state->enable_profile () && _scan_filter_profile == nullptr ) {
373418 std::string message;
374419 for (auto & conjunct : _conjuncts) {
375420 if (conjunct->root ()) {
@@ -424,14 +469,16 @@ Status ScanLocalState<Derived>::_normalize_predicate(VExprContext* context, cons
424469 }
425470 if (_is_predicate_acting_on_slot (expr_root->children (), &slot, &range)) {
426471 Status status = Status::OK ();
472+ auto & slot_predicates = _slot_id_to_predicates[slot->id ()];
473+ const size_t predicates_before = slot_predicates.size ();
427474 std::visit (
428475 [&](auto & value_range) {
429476 auto expr = root->is_rf_wrapper () ? root->get_impl () : root;
430477 {
431478 Defer attach_defer = [&]() {
432479 if (pdt != PushDownType::UNACCEPTABLE && root->is_rf_wrapper ()) {
433480 auto * rf_expr = assert_cast<RuntimeFilterExpr*>(root.get ());
434- _slot_id_to_predicates[slot-> id ()] .back ()->attach_profile_counter (
481+ slot_predicates .back ()->attach_profile_counter (
435482 rf_expr->filter_id (),
436483 rf_expr->predicate_filtered_rows_counter (),
437484 rf_expr->predicate_input_rows_counter (),
@@ -442,43 +489,36 @@ Status ScanLocalState<Derived>::_normalize_predicate(VExprContext* context, cons
442489 switch (expr->node_type ()) {
443490 case TExprNodeType::IN_PRED:
444491 RETURN_IF_PUSH_DOWN (
445- _normalize_in_predicate (context, expr, slot,
446- _slot_id_to_predicates[slot->id ()],
492+ _normalize_in_predicate (context, expr, slot, slot_predicates,
447493 value_range, &pdt),
448494 status);
449495 break ;
450496 case TExprNodeType::BINARY_PRED:
451497 RETURN_IF_PUSH_DOWN (
452498 _normalize_binary_predicate (context, expr, slot,
453- _slot_id_to_predicates[slot->id ()],
454- value_range, &pdt),
499+ slot_predicates, value_range, &pdt),
455500 status);
456501 break ;
457502 case TExprNodeType::FUNCTION_CALL:
458503 if (expr->is_topn_filter ()) {
459- RETURN_IF_PUSH_DOWN (
460- _normalize_topn_filter (context, expr, slot,
461- _slot_id_to_predicates[slot->id ()],
462- &pdt),
463- status);
504+ RETURN_IF_PUSH_DOWN (_normalize_topn_filter (context, expr, slot,
505+ slot_predicates, &pdt),
506+ status);
464507 } else {
465508 RETURN_IF_PUSH_DOWN (_normalize_is_null_predicate (
466- context, expr, slot,
467- _slot_id_to_predicates[slot->id ()],
509+ context, expr, slot, slot_predicates,
468510 value_range, &pdt),
469511 status);
470512 }
471513 break ;
472514 case TExprNodeType::BITMAP_PRED:
473- RETURN_IF_PUSH_DOWN (_normalize_bitmap_filter (
474- context, root, slot,
475- _slot_id_to_predicates[slot->id ()], &pdt),
515+ RETURN_IF_PUSH_DOWN (_normalize_bitmap_filter (context, root, slot,
516+ slot_predicates, &pdt),
476517 status);
477518 break ;
478519 case TExprNodeType::BLOOM_PRED:
479- RETURN_IF_PUSH_DOWN (_normalize_bloom_filter (
480- context, root, slot,
481- _slot_id_to_predicates[slot->id ()], &pdt),
520+ RETURN_IF_PUSH_DOWN (_normalize_bloom_filter (context, root, slot,
521+ slot_predicates, &pdt),
482522 status);
483523 break ;
484524 default :
@@ -493,6 +533,18 @@ Status ScanLocalState<Derived>::_normalize_predicate(VExprContext* context, cons
493533 },
494534 *range);
495535 RETURN_IF_ERROR (status);
536+ if (pdt != PushDownType::UNACCEPTABLE) {
537+ auto handle = context->scan_filter_handle ();
538+ if (!handle) {
539+ handle = _register_scan_filter (root, slot);
540+ context->attach_scan_filter (handle);
541+ }
542+ if (handle) {
543+ for (size_t i = predicates_before; i < slot_predicates.size (); ++i) {
544+ slot_predicates[i]->attach_scan_filter (handle);
545+ }
546+ }
547+ }
496548 }
497549 if (pdt == PushDownType::ACCEPTABLE && slotref != nullptr &&
498550 slotref->data_type ()->get_primitive_type () == PrimitiveType::TYPE_VARIANT) {
@@ -614,7 +666,12 @@ Status ScanLocalStateBase::_normalize_function_filters(VExprContext* expr_ctx, S
614666 expr_ctx, &val, &fn_ctx, temp_pdt));
615667 if (temp_pdt != PushDownType::UNACCEPTABLE) {
616668 std::string col = slot->col_name ();
617- _push_down_functions.emplace_back (opposite, col, fn_ctx, val);
669+ auto handle = expr_ctx->scan_filter_handle ();
670+ if (!handle) {
671+ handle = _register_scan_filter (expr_ctx->root (), slot);
672+ expr_ctx->attach_scan_filter (handle);
673+ }
674+ _push_down_functions.emplace_back (opposite, col, fn_ctx, val, handle);
618675 *pdt = temp_pdt;
619676 }
620677 }
@@ -1343,7 +1400,12 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
13431400 std::list<std::shared_ptr<ScannerDelegate>> {}.swap (_scanners);
13441401 COUNTER_SET (_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time ());
13451402 COUNTER_SET (_wait_for_rf_timer, rf_time);
1346- _helper.collect_realtime_profile (custom_profile ());
1403+ _helper.collect_realtime_profile (custom_profile (), _scan_filter_profile.get ());
1404+ if (_scan_filter_profile != nullptr ) {
1405+ _scan_filter_profile->set_runtime_filter_partition_pruning_stats (
1406+ _runtime_filter_partition_pruning_stats ());
1407+ _scan_filter_profile->materialize (custom_profile (), state->profile_level ());
1408+ }
13471409 return PipelineXLocalState<>::close (state);
13481410}
13491411
0 commit comments