Skip to content

Commit d5e9518

Browse files
authored
[Bug](scan) Preserve IN_LIST runtime filter predicates when key range is a scope range (#62027)
This pull request addresses a bug in the OLAP scan operator where `IN_LIST` predicates could be incorrectly erased when both `MINMAX` and `IN` runtime filters targeted the same key column, and the number of `IN` values exceeded the maximum allowed for pushdown. The changes ensure that `IN_LIST` predicates are preserved in such cases, preventing incorrect query results. Additionally, a regression test is added to verify the fix. **Bug fix in predicate handling:** * Modified the logic in `_build_key_ranges_and_filters()` within `olap_scan_operator.cpp` to ensure that `IN_LIST` predicates are not erased when the key range is a scope range (e.g., `>= X AND <= Y`) and the `IN` filter's value count exceeds `max_pushdown_conditions_per_column`. This preserves filtering semantics that are not captured by the scope range. [[1]](diffhunk://#diff-3ddc75656071d9c0e6b0be450e152a1c94559f7e70ea820e7f0c80a7078e3292R972) [[2]](diffhunk://#diff-3ddc75656071d9c0e6b0be450e152a1c94559f7e70ea820e7f0c80a7078e3292R986) [[3]](diffhunk://#diff-3ddc75656071d9c0e6b0be450e152a1c94559f7e70ea820e7f0c80a7078e3292L986-R1013) * Enhanced the profiling output in `_process_conjuncts()` to accurately reflect the set of predicates that will reach the storage layer after key range and filter construction. This helps with debugging and verification of predicate pushdown. **Testing and regression coverage:** * Added a new regression test `test_rf_in_list_not_erased_by_scope_range.groovy` to verify that `IN_LIST` predicates are not incorrectly erased when both `MINMAX` and `IN` filters are present and the `IN` list is too large to be absorbed into the key range. * Added the corresponding expected output file for the new regression test.
1 parent 1ce2e40 commit d5e9518

8 files changed

Lines changed: 166 additions & 36 deletions

File tree

be/src/exec/operator/olap_scan_operator.cpp

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -951,6 +951,7 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
951951
const auto& value_range = iter->second;
952952

953953
std::optional<int> key_to_erase;
954+
bool is_fixed_value_range = false;
954955

955956
RETURN_IF_ERROR(std::visit(
956957
[&](auto&& range) {
@@ -964,6 +965,7 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
964965
&exact_range, &eos, &should_break));
965966
if (exact_range) {
966967
key_to_erase = iter->first;
968+
is_fixed_value_range = range.is_fixed_value_range();
967969
}
968970
} else {
969971
// if exceed max_pushdown_conditions_per_column, use whole_value_rang instead
@@ -981,9 +983,49 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
981983
if (key_to_erase.has_value()) {
982984
_slot_id_to_value_range.erase(*key_to_erase);
983985

986+
// Determine which predicates are subsumed by the scan key range and can
987+
// be removed. The rule depends on the ColumnValueRange type:
988+
//
989+
// Fixed value range → scan keys are exact point lookups, so both
990+
// comparison (EQ/LT/LE/GT/GE) and positive IN_LIST
991+
// predicates are fully captured and can be erased.
992+
//
993+
// Scope range → scan keys only capture [low, high] boundaries,
994+
// so only comparison predicates are subsumed.
995+
// IN_LIST predicates (whose values may NOT have been
996+
// absorbed into the ColumnValueRange, e.g., because
997+
// the value count exceeded max_pushdown_conditions_per_column)
998+
// must be preserved.
999+
//
1000+
// In either case, predicates with negation semantics (effective NE / NOT_IN_LIST)
1001+
// are never subsumed by scan key ranges and must always be preserved.
1002+
auto can_erase_predicate = [is_fixed_value_range](const ColumnPredicate& pred) {
1003+
PredicateType pt = pred.type();
1004+
bool opposite = pred.opposite();
1005+
1006+
// Effective NE: never subsumed by any scan key range.
1007+
if ((pt == PredicateType::NE && !opposite) ||
1008+
(pt == PredicateType::EQ && opposite)) {
1009+
return false;
1010+
}
1011+
// Comparison predicates (EQ/LT/LE/GT/GE) or IS_NULL/IS_NOT_NULL: subsumed by both
1012+
// fixed value and scope ranges.
1013+
if (PredicateTypeTraits::is_comparison(pt) || pt == PredicateType::IS_NULL ||
1014+
pt == PredicateType::IS_NOT_NULL) {
1015+
return true;
1016+
}
1017+
// Effective IN_LIST: only subsumed by fixed value range.
1018+
if ((pt == PredicateType::IN_LIST && !opposite) ||
1019+
(pt == PredicateType::NOT_IN_LIST && opposite)) {
1020+
return is_fixed_value_range;
1021+
}
1022+
// Everything else (BF, BITMAP, NOT_IN_LIST, etc.): keep.
1023+
return false;
1024+
};
1025+
9841026
std::vector<std::shared_ptr<ColumnPredicate>> new_predicates;
9851027
for (const auto& it : _slot_id_to_predicates[*key_to_erase]) {
986-
if (!it->could_be_erased()) {
1028+
if (!can_erase_predicate(*it)) {
9871029
new_predicates.push_back(it);
9881030
}
9891031
}

be/src/exec/operator/scan_operator.cpp

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,23 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info)
153153
return Status::OK();
154154
}
155155

156+
static std::string predicates_to_string(
157+
const phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
158+
slot_id_to_predicates) {
159+
fmt::memory_buffer debug_string_buffer;
160+
for (const auto& [slot_id, predicates] : slot_id_to_predicates) {
161+
if (predicates.empty()) {
162+
continue;
163+
}
164+
fmt::format_to(debug_string_buffer, "Slot ID: {}: [", slot_id);
165+
for (const auto& predicate : predicates) {
166+
fmt::format_to(debug_string_buffer, "{{{}}}, ", predicate->debug_string());
167+
}
168+
fmt::format_to(debug_string_buffer, "] ");
169+
}
170+
return fmt::to_string(debug_string_buffer);
171+
}
172+
156173
template <typename Derived>
157174
Status ScanLocalState<Derived>::open(RuntimeState* state) {
158175
SCOPED_TIMER(exec_time_counter());
@@ -193,6 +210,11 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
193210

194211
RETURN_IF_ERROR(_process_conjuncts(state));
195212

213+
if (state->enable_profile()) {
214+
custom_profile()->add_info_string("PushDownPredicates",
215+
predicates_to_string(_slot_id_to_predicates));
216+
}
217+
196218
auto status = _eos ? Status::OK() : _prepare_scanners();
197219
RETURN_IF_ERROR(status);
198220
if (auto ctx = _scanner_ctx.load()) {
@@ -203,23 +225,6 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
203225
return status;
204226
}
205227

206-
static std::string predicates_to_string(
207-
const phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
208-
slot_id_to_predicates) {
209-
fmt::memory_buffer debug_string_buffer;
210-
for (const auto& [slot_id, predicates] : slot_id_to_predicates) {
211-
if (predicates.empty()) {
212-
continue;
213-
}
214-
fmt::format_to(debug_string_buffer, "Slot ID: {}: [", slot_id);
215-
for (const auto& predicate : predicates) {
216-
fmt::format_to(debug_string_buffer, "{{{}}}, ", predicate->debug_string());
217-
}
218-
fmt::format_to(debug_string_buffer, "] ");
219-
}
220-
return fmt::to_string(debug_string_buffer);
221-
}
222-
223228
static void init_slot_value_range(
224229
phmap::flat_hash_map<int, ColumnValueRangeType>& slot_id_to_value_range,
225230
SlotDescriptor* slot, const DataTypePtr type_desc) {
@@ -325,8 +330,6 @@ Status ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
325330
}
326331

327332
if (state->enable_profile()) {
328-
custom_profile()->add_info_string("PushDownPredicates",
329-
predicates_to_string(_slot_id_to_predicates));
330333
std::string message;
331334
for (auto& conjunct : _conjuncts) {
332335
if (conjunct->root()) {

be/src/storage/predicate/column_predicate.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,7 @@ class ColumnPredicate : public std::enable_shared_from_this<ColumnPredicate> {
226226
}
227227

228228
virtual double get_ignore_threshold() const { return 0; }
229-
// If this predicate acts on the key column, this predicate should be erased.
230-
virtual bool could_be_erased() const { return false; }
229+
231230
// Return the size of value set for IN/NOT IN predicates and 0 for others.
232231
virtual std::string debug_string() const {
233232
fmt::memory_buffer debug_string_buffer;

be/src/storage/predicate/comparison_predicate.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,6 @@ class ComparisonPredicateBase final : public ColumnPredicate {
5151
ColumnPredicate::debug_string());
5252
return fmt::to_string(debug_string_buffer);
5353
}
54-
bool could_be_erased() const override {
55-
if ((PT == PredicateType::NE && !_opposite) || (PT == PredicateType::EQ && _opposite)) {
56-
return false;
57-
}
58-
return true;
59-
}
6054

6155
PredicateType type() const override { return PT; }
6256

be/src/storage/predicate/in_list_predicate.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,6 @@ class InListPredicateBase final : public ColumnPredicate {
147147

148148
PredicateType type() const override { return PT; }
149149

150-
bool could_be_erased() const override {
151-
if ((PT == PredicateType::NOT_IN_LIST && !_opposite) ||
152-
(PT == PredicateType::IN_LIST && _opposite)) {
153-
return false;
154-
}
155-
return true;
156-
}
157150
Status evaluate(const IndexFieldNameAndTypePair& name_with_type, IndexIterator* iterator,
158151
uint32_t num_rows, roaring::Roaring* result) const override {
159152
if (iterator == nullptr) {

be/src/storage/predicate/null_predicate.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ class NullPredicate final : public ColumnPredicate {
5858
ColumnPredicate::debug_string(), _is_null);
5959
return fmt::to_string(debug_string_buffer);
6060
}
61-
bool could_be_erased() const override { return true; }
6261

6362
PredicateType type() const override;
6463

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !join --
3+
10 10
4+
12 12
5+
2 2
6+
4 4
7+
6 6
8+
8 8
9+
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// This test verifies that when both MINMAX and IN runtime filters target the same
19+
// key column, and the IN filter's value count exceeds max_pushdown_conditions_per_column,
20+
// the IN_LIST predicate is NOT incorrectly erased by the key range construction logic.
21+
// Regression test for the bug where _build_key_ranges_and_filters() erased IN_LIST
22+
// predicates when the ColumnValueRange was a scope range (from MINMAX filter).
23+
suite("test_rf_in_list_not_erased_by_scope_range") {
24+
sql "drop table if exists rf_scope_probe;"
25+
sql "drop table if exists rf_scope_build;"
26+
27+
sql """
28+
CREATE TABLE rf_scope_probe (
29+
k1 BIGINT,
30+
v1 INT
31+
)
32+
DUPLICATE KEY(k1)
33+
DISTRIBUTED BY HASH(k1) BUCKETS 1
34+
PROPERTIES ("replication_num" = "1");
35+
"""
36+
37+
sql """
38+
CREATE TABLE rf_scope_build (
39+
k1 BIGINT,
40+
v1 INT
41+
)
42+
DUPLICATE KEY(k1)
43+
DISTRIBUTED BY HASH(k1) BUCKETS 1
44+
PROPERTIES ("replication_num" = "1");
45+
"""
46+
47+
// Probe table: insert 20 rows with k1 from 1 to 20.
48+
// The build side will only match a subset (k1 in {2,4,6,8,10,12}).
49+
// Rows NOT in this subset (k1=1,3,5,7,9,11,13..20) should be filtered out
50+
// by the IN_LIST runtime filter.
51+
sql """
52+
INSERT INTO rf_scope_probe VALUES
53+
(1, 1), (2, 2), (3, 3), (4, 4), (5, 5),
54+
(6, 6), (7, 7), (8, 8), (9, 9), (10, 10),
55+
(11, 11), (12, 12), (13, 13), (14, 14), (15, 15),
56+
(16, 16), (17, 17), (18, 18), (19, 19), (20, 20);
57+
"""
58+
59+
// Build table: 6 distinct k1 values. This exceeds max_pushdown_conditions_per_column=5
60+
// so the IN values are NOT added to ColumnValueRange, but the IN_LIST predicate is created.
61+
// MINMAX range: [2, 12]
62+
sql """
63+
INSERT INTO rf_scope_build VALUES
64+
(2, 100), (4, 200), (6, 300), (8, 400), (10, 500), (12, 600);
65+
"""
66+
67+
sql "sync;"
68+
69+
// Set max_pushdown_conditions_per_column to 5, so the 6 IN values exceed it.
70+
// This causes IN values to NOT be added to the ColumnValueRange (it stays as
71+
// a scope range from the MINMAX filter), but the IN_LIST ColumnPredicate is still created.
72+
sql "set max_pushdown_conditions_per_column = 5;"
73+
// Use both IN and MIN_MAX runtime filter types so both are generated on the join key.
74+
sql "set runtime_filter_type = 'IN_OR_BLOOM_FILTER,MIN_MAX';"
75+
sql "set runtime_filter_wait_time_ms = 10000;"
76+
sql "set runtime_filter_wait_infinitely = true;"
77+
sql "set enable_runtime_filter_prune = false;"
78+
sql "set enable_left_semi_direct_return_opt = true;"
79+
sql "set parallel_pipeline_task_num = 1;"
80+
81+
// The join should only return 6 rows (matching k1 in {2,4,6,8,10,12}).
82+
// If the IN_LIST predicate is incorrectly erased, the MINMAX scope [2,12]
83+
// would let through rows with k1 in {3,5,7,9,11} as well, producing wrong results.
84+
// We verify correctness by checking the result.
85+
order_qt_join """
86+
SELECT p.k1, p.v1
87+
FROM rf_scope_probe p
88+
LEFT SEMI JOIN rf_scope_build b ON p.k1 = b.k1
89+
ORDER BY p.k1;
90+
"""
91+
}

0 commit comments

Comments
 (0)