Skip to content

Commit 191061a

Browse files
BiteTheDDDDtCopilotCopilot
authored
[fix](be) Fix RuntimeFilter selectivity sampling_frequency lost during VExprContext recreation (#62355)
introduced by #59832 Problem Summary: RuntimeFilter selectivity tracking was completely non-functional because `sampling_frequency` was lost during VExprContext recreation. In `RuntimeFilterConsumer::_get_push_exprs()`, `sampling_frequency=32` was set on a temporary `probe_ctx` VExprContext. However, only VRuntimeFilterWrapper expressions (VExpr) were returned, not the VExprContext. When `_append_rf_into_conjuncts()` later created a new VExprContext via `VExprContext::create_shared(expr)`, the new context had default `_sampling_frequency=-1` (DISABLE_SAMPLING). With `_sampling_frequency=-1`, the condition `(_judge_counter++) >= -1` evaluated to `0 >= -1 → true` on every call, causing `reset_judge_selectivity()` to fire every time. This meant selectivity counters were perpetually reset and never accumulated, making the runtime filter selectivity optimization completely ineffective. **Fix**: Store `sampling_frequency` in VRuntimeFilterWrapper (which survives VExprContext recreation) and propagate it to VExprContext in `VRuntimeFilterWrapper::open()`, which is called on both original and cloned contexts. ### Release note Fixed a bug where RuntimeFilter selectivity tracking was non-functional due to sampling_frequency being lost during VExprContext recreation, causing runtime filters that should be skipped (due to low selectivity) to never be identified. ### Check List (For Author) - Test: Unit Test - Added 2 regression tests to runtime_filter_selectivity_test.cpp - Added 3 new tests in vruntimefilter_wrapper_sampling_test.cpp - All 22 tests pass - Behavior changed: No (selectivity tracking was broken before, this makes it work as designed) - Does this need documentation: No --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
1 parent ea575d9 commit 191061a

File tree

5 files changed

+248
-15
lines changed

5 files changed

+248
-15
lines changed

be/src/exec/runtime_filter/runtime_filter_consumer.cpp

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "exec/runtime_filter/runtime_filter_consumer.h"
1919

20+
#include "exec/runtime_filter/runtime_filter_selectivity.h"
2021
#include "exprs/minmax_predicate.h"
2122
#include "exprs/vbitmap_predicate.h"
2223
#include "exprs/vbloom_predicate.h"
@@ -82,11 +83,11 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
8283
auto real_filter_type = _wrapper->get_real_type();
8384
bool null_aware = _wrapper->contain_null();
8485

85-
// Set sampling frequency based on disable_always_true_logic status
86+
// Determine sampling frequency for the always_true optimization.
87+
// This will be propagated to VExprContext in VRuntimeFilterWrapper::open().
8688
int sampling_frequency = _wrapper->disable_always_true_logic()
8789
? RuntimeFilterSelectivity::DISABLE_SAMPLING
8890
: config::runtime_filter_sampling_frequency;
89-
probe_ctx->get_runtime_filter_selectivity().set_sampling_frequency(sampling_frequency);
9091

9192
switch (real_filter_type) {
9293
case RuntimeFilterType::IN_FILTER: {
@@ -104,7 +105,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
104105
in_pred->add_child(probe_ctx->root());
105106
auto wrapper = VRuntimeFilterWrapper::create_shared(
106107
node, in_pred, get_in_list_ignore_thredhold(_wrapper->hybrid_set()->size()),
107-
null_aware, _wrapper->filter_id());
108+
null_aware, _wrapper->filter_id(), sampling_frequency);
108109
container.push_back(wrapper);
109110
break;
110111
}
@@ -124,7 +125,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
124125
}
125126
container.push_back(VRuntimeFilterWrapper::create_shared(
126127
min_pred_node, min_pred, get_comparison_ignore_thredhold(), null_aware,
127-
_wrapper->filter_id()));
128+
_wrapper->filter_id(), sampling_frequency));
128129
break;
129130
}
130131
case RuntimeFilterType::MAX_FILTER: {
@@ -143,7 +144,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
143144
}
144145
container.push_back(VRuntimeFilterWrapper::create_shared(
145146
max_pred_node, max_pred, get_comparison_ignore_thredhold(), null_aware,
146-
_wrapper->filter_id()));
147+
_wrapper->filter_id(), sampling_frequency));
147148
break;
148149
}
149150
case RuntimeFilterType::MINMAX_FILTER: {
@@ -159,7 +160,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
159160
max_pred->add_child(max_literal);
160161
container.push_back(VRuntimeFilterWrapper::create_shared(
161162
max_pred_node, max_pred, get_comparison_ignore_thredhold(), null_aware,
162-
_wrapper->filter_id()));
163+
_wrapper->filter_id(), sampling_frequency));
163164

164165
VExprContextSPtr new_probe_ctx;
165166
RETURN_IF_ERROR(VExpr::create_expr_tree(probe_expr, new_probe_ctx));
@@ -176,7 +177,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
176177
min_pred->add_child(min_literal);
177178
container.push_back(VRuntimeFilterWrapper::create_shared(
178179
min_pred_node, min_pred, get_comparison_ignore_thredhold(), null_aware,
179-
_wrapper->filter_id()));
180+
_wrapper->filter_id(), sampling_frequency));
180181
break;
181182
}
182183
case RuntimeFilterType::BLOOM_FILTER: {
@@ -191,9 +192,9 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
191192
auto bloom_pred = VBloomPredicate::create_shared(node);
192193
bloom_pred->set_filter(_wrapper->bloom_filter_func());
193194
bloom_pred->add_child(probe_ctx->root());
194-
auto wrapper = VRuntimeFilterWrapper::create_shared(node, bloom_pred,
195-
get_bloom_filter_ignore_thredhold(),
196-
null_aware, _wrapper->filter_id());
195+
auto wrapper = VRuntimeFilterWrapper::create_shared(
196+
node, bloom_pred, get_bloom_filter_ignore_thredhold(), null_aware,
197+
_wrapper->filter_id(), sampling_frequency);
197198
container.push_back(wrapper);
198199
break;
199200
}
@@ -212,8 +213,8 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
212213
if (null_aware) {
213214
return Status::InternalError("bitmap predicate do not support null aware");
214215
}
215-
auto wrapper = VRuntimeFilterWrapper::create_shared(node, bitmap_pred, 0, null_aware,
216-
_wrapper->filter_id());
216+
auto wrapper = VRuntimeFilterWrapper::create_shared(
217+
node, bitmap_pred, 0, null_aware, _wrapper->filter_id(), sampling_frequency);
217218
container.push_back(wrapper);
218219
break;
219220
}

be/src/exprs/vruntimefilter_wrapper.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,13 @@ class VExprContext;
5656

5757
VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExprSPtr impl,
5858
double ignore_thredhold, bool null_aware,
59-
int filter_id)
59+
int filter_id, int sampling_frequency)
6060
: VExpr(node),
6161
_impl(std::move(impl)),
6262
_ignore_thredhold(ignore_thredhold),
6363
_null_aware(null_aware),
64-
_filter_id(filter_id) {}
64+
_filter_id(filter_id),
65+
_sampling_frequency(sampling_frequency) {}
6566

6667
Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
6768
VExprContext* context) {
@@ -75,6 +76,7 @@ Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
7576
FunctionContext::FunctionStateScope scope) {
7677
DCHECK(_prepare_finished);
7778
RETURN_IF_ERROR(_impl->open(state, context, scope));
79+
context->get_runtime_filter_selectivity().set_sampling_frequency(_sampling_frequency);
7880
_open_finished = true;
7981
return Status::OK();
8082
}

be/src/exprs/vruntimefilter_wrapper.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
#include "common/config.h"
2828
#include "common/status.h"
29+
#include "exec/runtime_filter/runtime_filter_selectivity.h"
2930
#include "exprs/function_context.h"
3031
#include "exprs/vexpr.h"
3132
#include "runtime/runtime_profile.h"
@@ -50,7 +51,8 @@ class VRuntimeFilterWrapper final : public VExpr {
5051

5152
public:
5253
VRuntimeFilterWrapper(const TExprNode& node, VExprSPtr impl, double ignore_thredhold,
53-
bool null_aware, int filter_id);
54+
bool null_aware, int filter_id,
55+
int sampling_frequency = RuntimeFilterSelectivity::DISABLE_SAMPLING);
5456
~VRuntimeFilterWrapper() override = default;
5557
Status execute_column(VExprContext* context, const Block* block, Selector* selector,
5658
size_t count, ColumnPtr& result_column) const override;
@@ -125,6 +127,7 @@ class VRuntimeFilterWrapper final : public VExpr {
125127
double _ignore_thredhold;
126128
bool _null_aware;
127129
int _filter_id;
130+
int _sampling_frequency;
128131
};
129132

130133
using VRuntimeFilterPtr = std::shared_ptr<VRuntimeFilterWrapper>;

be/test/exec/runtime_filter/runtime_filter_selectivity_test.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,4 +228,50 @@ TEST_F(RuntimeFilterSelectivityTest, different_thresholds) {
228228
}
229229
}
230230

231+
// Regression test: with default sampling_frequency (-1), update_judge_counter()
232+
// always resets because (_judge_counter++) >= -1 is always true.
233+
// This was the root cause of the selectivity accumulation bug.
234+
TEST_F(RuntimeFilterSelectivityTest, default_sampling_frequency_always_resets) {
235+
RuntimeFilterSelectivity selectivity;
236+
// Don't set sampling_frequency — defaults to DISABLE_SAMPLING (-1)
237+
238+
// Accumulate selectivity data: low filter rate -> should be always_true
239+
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
240+
// With default -1, maybe_always_true_can_ignore returns false (disabled)
241+
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
242+
243+
// Now call update_judge_counter — with -1, it immediately resets
244+
selectivity.update_judge_counter();
245+
// Verify: accumulated data has been wiped out by the reset
246+
// Even after setting a valid sampling_frequency, the previously accumulated
247+
// selectivity data is gone
248+
selectivity.set_sampling_frequency(100);
249+
// always_true was reset to false by the premature reset
250+
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
251+
}
252+
253+
// Verify that setting sampling_frequency correctly prevents premature reset
254+
TEST_F(RuntimeFilterSelectivityTest, proper_sampling_frequency_preserves_accumulation) {
255+
RuntimeFilterSelectivity selectivity;
256+
selectivity.set_sampling_frequency(32);
257+
258+
// Accumulate selectivity: low filter rate
259+
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
260+
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
261+
262+
// Counter increments don't reset before reaching sampling_frequency.
263+
// Post-increment semantics: check uses old value, so need 33 calls total
264+
// to trigger reset (counter must reach 32 before comparison fires).
265+
for (int i = 0; i < 32; i++) {
266+
selectivity.update_judge_counter();
267+
}
268+
// Still always_true because counter value 31 was compared last (31 >= 32 → false)
269+
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
270+
271+
// 33rd call: counter=32, 32 >= 32 → true → triggers reset
272+
selectivity.update_judge_counter();
273+
// After reset, needs re-evaluation
274+
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
275+
}
276+
231277
} // namespace doris
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <glog/logging.h>
19+
#include <gtest/gtest.h>
20+
21+
#include "exec/runtime_filter/runtime_filter_selectivity.h"
22+
#include "exec/runtime_filter/runtime_filter_test_utils.h"
23+
#include "exprs/vexpr_context.h"
24+
#include "exprs/vruntimefilter_wrapper.h"
25+
26+
namespace doris {
27+
28+
// Minimal VExpr implementation for testing VRuntimeFilterWrapper in isolation.
29+
class StubVExpr : public VExpr {
30+
public:
31+
StubVExpr() : VExpr(make_texpr_node()) {}
32+
33+
const std::string& expr_name() const override {
34+
static const std::string name = "StubVExpr";
35+
return name;
36+
}
37+
38+
Status execute(VExprContext*, Block*, int*) const override { return Status::OK(); }
39+
40+
Status execute_column(VExprContext*, const Block*, Selector*, size_t,
41+
ColumnPtr&) const override {
42+
return Status::OK();
43+
}
44+
45+
// SLOT_REF is not a constant — without this override, VExpr::is_constant()
46+
// returns true for a leaf node (no children), causing get_const_col() to
47+
// DCHECK-fail on the second open() call.
48+
bool is_constant() const override { return false; }
49+
50+
private:
51+
static TExprNode make_texpr_node() {
52+
return TExprNodeBuilder(TExprNodeType::SLOT_REF,
53+
TTypeDescBuilder()
54+
.set_types(TTypeNodeBuilder()
55+
.set_type(TTypeNodeType::SCALAR)
56+
.set_scalar_type(TPrimitiveType::INT)
57+
.build())
58+
.build(),
59+
0)
60+
.build();
61+
}
62+
};
63+
64+
class VRuntimeFilterWrapperSamplingTest : public RuntimeFilterTest {};
65+
66+
// Test that VRuntimeFilterWrapper stores and propagates sampling_frequency
67+
// through open() to VExprContext. This is the core fix for the bug where
68+
// sampling_frequency was lost when _append_rf_into_conjuncts creates a new
69+
// VExprContext via VExprContext::create_shared(expr).
70+
TEST_F(VRuntimeFilterWrapperSamplingTest, open_propagates_sampling_frequency) {
71+
auto stub = std::make_shared<StubVExpr>();
72+
auto node = TExprNodeBuilder(TExprNodeType::SLOT_REF,
73+
TTypeDescBuilder()
74+
.set_types(TTypeNodeBuilder()
75+
.set_type(TTypeNodeType::SCALAR)
76+
.set_scalar_type(TPrimitiveType::INT)
77+
.build())
78+
.build(),
79+
0)
80+
.build();
81+
82+
const int expected_frequency = 32;
83+
auto wrapper = VRuntimeFilterWrapper::create_shared(node, stub, 0.4, false, /*filter_id=*/1,
84+
expected_frequency);
85+
86+
// Simulate the VExprContext recreation that happens in _append_rf_into_conjuncts.
87+
// A fresh VExprContext has default sampling_frequency = DISABLE_SAMPLING (-1).
88+
auto context = std::make_shared<VExprContext>(wrapper);
89+
ASSERT_EQ(context->get_runtime_filter_selectivity().maybe_always_true_can_ignore(), false);
90+
91+
RowDescriptor row_desc;
92+
ASSERT_TRUE(wrapper->prepare(_runtime_states[0].get(), row_desc, context.get()).ok());
93+
ASSERT_TRUE(
94+
wrapper->open(_runtime_states[0].get(), context.get(), FunctionContext::FRAGMENT_LOCAL)
95+
.ok());
96+
97+
// After open(), sampling_frequency should be propagated from VRuntimeFilterWrapper
98+
// to VExprContext. Verify by accumulating low-selectivity data and checking
99+
// that always_true can now be detected.
100+
auto& selectivity = context->get_runtime_filter_selectivity();
101+
selectivity.update_judge_selectivity(1, 2000, 50000, 0.1);
102+
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
103+
}
104+
105+
// Test that default sampling_frequency (DISABLE_SAMPLING) disables the always_true
106+
// optimization, matching the behavior when disable_always_true_logic is set.
107+
TEST_F(VRuntimeFilterWrapperSamplingTest, default_sampling_frequency_disables_optimization) {
108+
auto stub = std::make_shared<StubVExpr>();
109+
auto node = TExprNodeBuilder(TExprNodeType::SLOT_REF,
110+
TTypeDescBuilder()
111+
.set_types(TTypeNodeBuilder()
112+
.set_type(TTypeNodeType::SCALAR)
113+
.set_scalar_type(TPrimitiveType::INT)
114+
.build())
115+
.build(),
116+
0)
117+
.build();
118+
119+
// No sampling_frequency argument - uses default DISABLE_SAMPLING
120+
auto wrapper = VRuntimeFilterWrapper::create_shared(node, stub, 0.4, false, /*filter_id=*/1);
121+
122+
auto context = std::make_shared<VExprContext>(wrapper);
123+
RowDescriptor row_desc;
124+
ASSERT_TRUE(wrapper->prepare(_runtime_states[0].get(), row_desc, context.get()).ok());
125+
ASSERT_TRUE(
126+
wrapper->open(_runtime_states[0].get(), context.get(), FunctionContext::FRAGMENT_LOCAL)
127+
.ok());
128+
129+
// Even with low-selectivity data, always_true should NOT be detected
130+
// because sampling is disabled
131+
auto& selectivity = context->get_runtime_filter_selectivity();
132+
selectivity.update_judge_selectivity(1, 2000, 50000, 0.1);
133+
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
134+
}
135+
136+
// Test that sampling_frequency survives VExprContext recreation, which is the
137+
// exact scenario that caused the original bug.
138+
TEST_F(VRuntimeFilterWrapperSamplingTest, sampling_frequency_survives_context_recreation) {
139+
auto stub = std::make_shared<StubVExpr>();
140+
auto node = TExprNodeBuilder(TExprNodeType::SLOT_REF,
141+
TTypeDescBuilder()
142+
.set_types(TTypeNodeBuilder()
143+
.set_type(TTypeNodeType::SCALAR)
144+
.set_scalar_type(TPrimitiveType::INT)
145+
.build())
146+
.build(),
147+
0)
148+
.build();
149+
150+
const int expected_frequency = 32;
151+
auto wrapper = VRuntimeFilterWrapper::create_shared(node, stub, 0.4, false, /*filter_id=*/1,
152+
expected_frequency);
153+
154+
// First context - prepare and open work
155+
auto context1 = std::make_shared<VExprContext>(wrapper);
156+
RowDescriptor row_desc;
157+
ASSERT_TRUE(wrapper->prepare(_runtime_states[0].get(), row_desc, context1.get()).ok());
158+
ASSERT_TRUE(
159+
wrapper->open(_runtime_states[0].get(), context1.get(), FunctionContext::FRAGMENT_LOCAL)
160+
.ok());
161+
162+
// Create a brand new non-clone VExprContext with the same VRuntimeFilterWrapper,
163+
// matching the production path in _append_rf_into_conjuncts which calls
164+
// VExprContext::create_shared(expr) then conjunct->prepare() and conjunct->open().
165+
auto context2 = std::make_shared<VExprContext>(wrapper);
166+
EXPECT_FALSE(context2->get_runtime_filter_selectivity().maybe_always_true_can_ignore());
167+
168+
// Drive the recreated context through prepare/open via VExprContext (not the
169+
// wrapper directly), matching the production _append_rf_into_conjuncts lifecycle.
170+
ASSERT_TRUE(context2->prepare(_runtime_states[0].get(), row_desc).ok());
171+
ASSERT_TRUE(context2->open(_runtime_states[0].get()).ok());
172+
173+
// After open(), sampling_frequency should be propagated from VRuntimeFilterWrapper
174+
// to context2. Verify by accumulating low-selectivity data and checking that
175+
// always_true can be detected — this is the actual behavior the fix protects.
176+
auto& selectivity = context2->get_runtime_filter_selectivity();
177+
selectivity.update_judge_selectivity(1, 2000, 50000, 0.1);
178+
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
179+
}
180+
181+
} // namespace doris

0 commit comments

Comments
 (0)