Skip to content

Commit 4acb016

Browse files
authored
fix(query): resolve physical column name for runtime filter probe target (#20105)
* fix(query): resolve physical column name for runtime filter probe target When building the bloom runtime filter's probe-side column reference, use the physical column name from the table schema (via column_id) instead of the ColumnBinding's context name. The binding name can be an alias (e.g. SELECT col_b AS col_a), which may collide with another column in the same table. This caused the bloom filter to target the wrong column, reading unrelated data and producing false negatives. * fix(query): skip physical name lookup for nested/path columns Use schema_with_stream() to also resolve stream column names correctly. For nested/path columns or when schema lookup fails, fall back to base_col.column_name (the stable metadata-level name) instead of the potentially-aliased ColumnBinding name. * ci(query): add stream runtime filter alias test
1 parent 32b7443 commit 4acb016

4 files changed

Lines changed: 263 additions & 7 deletions

File tree

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -739,11 +739,32 @@ impl PhysicalPlanBuilder {
739739
.base_column_scan_id(*column_idx)
740740
.unwrap();
741741

742+
let metadata = self.metadata.read();
742743
return Ok(Some((
743744
left_condition
744745
.as_raw_expr()
745-
.type_check(&*self.metadata.read())?
746-
.project_column_ref(|col| Ok(col.column_name.clone()))?,
746+
.type_check(&*metadata)?
747+
.project_column_ref(|col| {
748+
// Use the physical column name from the table schema
749+
// (looked up by column_id) rather than the binding's
750+
// context name, which can be an alias that collides
751+
// with another column in the same table.
752+
let entry = metadata.column(col.index);
753+
if let ColumnEntry::BaseTableColumn(base_col) = entry {
754+
if base_col.path_indices.is_none() {
755+
let table = metadata.table(base_col.table_index);
756+
let schema = table.table().schema_with_stream();
757+
if let Ok(field) = schema.field_of_column_id(base_col.column_id)
758+
{
759+
return Ok(field.name().clone());
760+
}
761+
}
762+
// For nested/path columns, or when schema lookup
763+
// fails, use the stable metadata-level column name.
764+
return Ok(base_col.column_name.clone());
765+
}
766+
Ok(col.column_name.clone())
767+
})?,
747768
scan_id,
748769
table_index,
749770
*column_idx,

src/query/service/src/physical_plans/runtime_filter/builder.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -280,11 +280,27 @@ fn scalar_to_remote_expr(
280280
return Ok(None);
281281
};
282282

283-
let remote_expr = scalar
284-
.as_raw_expr()
285-
.type_check(&*metadata.read())?
286-
.project_column_ref(|col| Ok(col.column_name.clone()))?
287-
.as_remote_expr();
283+
let remote_expr = {
284+
let md = metadata.read();
285+
scalar
286+
.as_raw_expr()
287+
.type_check(&*md)?
288+
.project_column_ref(|col| {
289+
let entry = md.column(col.index);
290+
if let ColumnEntry::BaseTableColumn(base_col) = entry {
291+
if base_col.path_indices.is_none() {
292+
let table = md.table(base_col.table_index);
293+
let schema = table.table().schema_with_stream();
294+
if let Ok(field) = schema.field_of_column_id(base_col.column_id) {
295+
return Ok(field.name().clone());
296+
}
297+
}
298+
return Ok(base_col.column_name.clone());
299+
}
300+
Ok(col.column_name.clone())
301+
})?
302+
.as_remote_expr()
303+
};
288304

289305
if supported_probe_key_for_runtime_filter(&remote_expr) {
290306
return Ok(Some((remote_expr, scan_id, column_idx)));
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
## Copyright 2023 Databend Cloud
2+
##
3+
## Licensed under the Elastic License, Version 2.0 (the "License");
4+
## you may not use this file except in compliance with the License.
5+
## You may obtain a copy of the License at
6+
##
7+
## https://www.elastic.co/licensing/elastic-license
8+
##
9+
## Unless required by applicable law or agreed to in writing, software
10+
## distributed under the License is distributed on an "AS IS" BASIS,
11+
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
## See the License for the specific language governing permissions and
13+
## limitations under the License.
14+
15+
## Test: runtime filter must resolve hidden stream columns by schema_with_stream().
16+
## Mutations populate _origin_block_row_num; plain inserts leave it NULL.
17+
## The subquery aliases _origin_block_row_num as row_num, colliding with a physical column.
18+
## Bug scenario: RF resolves row_num to the physical column instead of the hidden stream column.
19+
## Expected: 3 matches.
20+
21+
statement ok
22+
DROP DATABASE IF EXISTS test_stream_rf_alias_collision
23+
24+
statement ok
25+
CREATE DATABASE test_stream_rf_alias_collision
26+
27+
statement ok
28+
USE test_stream_rf_alias_collision
29+
30+
statement ok
31+
CREATE TABLE t_stream_probe(row_num UInt64, val UInt64) ROW_PER_BLOCK = 10
32+
33+
statement ok
34+
ALTER TABLE t_stream_probe SET OPTIONS(change_tracking = true)
35+
36+
statement ok
37+
INSERT INTO t_stream_probe VALUES (100, 1), (101, 2), (102, 3), (103, 4), (104, 5), (105, 6), (106, 7), (107, 8)
38+
39+
statement ok
40+
DELETE FROM t_stream_probe WHERE val >= 7
41+
42+
statement ok
43+
CREATE TABLE t_stream_build(row_num UInt64)
44+
45+
statement ok
46+
INSERT INTO t_stream_build VALUES (0), (1), (2)
47+
48+
## Sanity check: the mutation rewrites the surviving rows with origin row numbers.
49+
query II
50+
SELECT row_num, _origin_block_row_num FROM t_stream_probe ORDER BY row_num
51+
----
52+
100 0
53+
101 1
54+
102 2
55+
103 3
56+
104 4
57+
105 5
58+
59+
query T
60+
EXPLAIN SELECT count()
61+
FROM t_stream_build AS b
62+
INNER JOIN (
63+
SELECT _origin_block_row_num AS row_num
64+
FROM t_stream_probe
65+
WHERE _origin_block_row_num IS NOT NULL
66+
) AS p
67+
ON b.row_num = p.row_num
68+
----
69+
AggregateFinal
70+
├── output columns: [count() (#6)]
71+
├── group by: []
72+
├── aggregate functions: [count()]
73+
├── estimated rows: 1.00
74+
└── AggregatePartial
75+
├── group by: []
76+
├── aggregate functions: [count()]
77+
├── estimated rows: 1.00
78+
└── HashJoin
79+
├── output columns: []
80+
├── join type: INNER
81+
├── build keys: [b.row_num (#0)]
82+
├── probe keys: [p.row_num (#5)]
83+
├── keys is null equal: [false]
84+
├── filters: []
85+
├── build join filters:
86+
│ └── filter id:0, build key:b.row_num (#0), probe targets:[p.row_num (#5)@scan1], filter type:bloom,inlist,min_max
87+
├── estimated rows: 3.00
88+
├── TableScan(Build)
89+
│ ├── table: default.test_stream_rf_alias_collision.t_stream_build
90+
│ ├── scan id: 0
91+
│ ├── output columns: [row_num (#0)]
92+
│ ├── read rows: 3
93+
│ ├── read size: < 1 KiB
94+
│ ├── partitions total: 1
95+
│ ├── partitions scanned: 1
96+
│ ├── pruning stats: [segments: <read cost: <slt:ignore>, decompress cost: <slt:ignore>, range pruning: 1 to 1 cost: <slt:ignore>>, blocks: <range pruning: 1 to 1 cost: <slt:ignore>>]
97+
│ ├── push downs: [filters: [is_not_null(t_stream_build.row_num (#0))], limit: NONE]
98+
│ └── estimated rows: 3.00
99+
└── Filter(Probe)
100+
├── output columns: [t_stream_probe._origin_block_row_num (#5)]
101+
├── filters: [is_not_null(t_stream_probe._origin_block_row_num (#5))]
102+
├── estimated rows: 6.00
103+
└── TableScan
104+
├── table: default.test_stream_rf_alias_collision.t_stream_probe
105+
├── scan id: 1
106+
├── output columns: [_origin_block_row_num (#5)]
107+
├── read rows: 6
108+
├── read size: < 1 KiB
109+
├── partitions total: 1
110+
├── partitions scanned: 1
111+
├── pruning stats: [segments: <read cost: <slt:ignore>, decompress cost: <slt:ignore>, range pruning: 1 to 1 cost: <slt:ignore>>, blocks: <range pruning: 1 to 1 cost: <slt:ignore>>]
112+
├── push downs: [filters: [is_not_null(t_stream_probe._origin_block_row_num (#5))], limit: NONE]
113+
├── apply join filters: [#0]
114+
└── estimated rows: 6.00
115+
116+
query I
117+
SELECT count()
118+
FROM t_stream_build AS b
119+
INNER JOIN (
120+
SELECT _origin_block_row_num AS row_num
121+
FROM t_stream_probe
122+
WHERE _origin_block_row_num IS NOT NULL
123+
) AS p
124+
ON b.row_num = p.row_num
125+
----
126+
3
127+
128+
statement ok
129+
DROP DATABASE test_stream_rf_alias_collision
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Test: bloom runtime filter must resolve the physical column name, not an alias.
2+
#
3+
# t_probe has col_a (values 100..404) and col_b (values 1..20).
4+
# Subquery aliases col_b AS col_a, then joins with t_build(col_a = 1,2,3).
5+
# Bug: RF resolves "col_a" to the physical col_a (100..404) instead of
6+
# the aliased source col_b (1..20), bloom rejects all rows → returns 0.
7+
# Expected: 3
8+
9+
statement ok
10+
CREATE OR REPLACE TABLE t_probe(col_a INT NOT NULL, col_b INT NOT NULL) row_per_block = 5;
11+
12+
statement ok
13+
CREATE OR REPLACE TABLE t_build(col_a INT NOT NULL);
14+
15+
statement ok
16+
INSERT INTO t_probe VALUES (100,1),(101,2),(102,3),(103,4),(104,5),(200,6),(201,7),(202,8),(203,9),(204,10),(300,11),(301,12),(302,13),(303,14),(304,15),(400,16),(401,17),(402,18),(403,19),(404,20);
17+
18+
statement ok
19+
INSERT INTO t_build VALUES (1),(2),(3);
20+
21+
query I
22+
SELECT count() FROM t_build AS b INNER JOIN (SELECT col_b AS col_a FROM t_probe) AS p ON b.col_a = p.col_a;
23+
----
24+
3
25+
26+
statement ok
27+
DROP TABLE t_probe;
28+
29+
statement ok
30+
DROP TABLE t_build;
31+
32+
# Test: runtime filter on a tuple inner field must still work correctly.
33+
# The path_indices guard should not break nested column RF behavior.
34+
# t_nested has a Tuple column with named field "key". We join on t:key.
35+
# Expected: 3 matches.
36+
37+
statement ok
38+
CREATE OR REPLACE TABLE t_nested(id INT NOT NULL, t TUPLE(key INT, name VARCHAR) NOT NULL) row_per_block = 5;
39+
40+
statement ok
41+
CREATE OR REPLACE TABLE t_lookup(key INT NOT NULL);
42+
43+
statement ok
44+
INSERT INTO t_nested VALUES (1,(10,'a')),(2,(20,'b')),(3,(30,'c')),(4,(40,'d')),(5,(50,'e')),(6,(60,'f')),(7,(70,'g')),(8,(80,'h')),(9,(90,'i')),(10,(100,'j'));
45+
46+
statement ok
47+
INSERT INTO t_lookup VALUES (10),(20),(30);
48+
49+
query I
50+
SELECT count() FROM t_lookup AS b INNER JOIN t_nested AS p ON b.key = p.t:key;
51+
----
52+
3
53+
54+
statement ok
55+
DROP TABLE t_nested;
56+
57+
statement ok
58+
DROP TABLE t_lookup;
59+
60+
# Test: tuple inner field aliased to collide with a top-level column.
61+
#
62+
# t_data has both a top-level "key" (values 100..109) and a tuple field
63+
# t:key (values 1..10). Subquery aliases t:key AS key, which collides
64+
# with the physical top-level "key" column.
65+
# Bug scenario: RF resolves "key" to the physical top-level column
66+
# (100..109) instead of the tuple leaf t:key (1..10), bloom rejects all.
67+
# Expected: 3
68+
69+
statement ok
70+
CREATE OR REPLACE TABLE t_data(key INT NOT NULL, t TUPLE(key INT, name VARCHAR) NOT NULL) row_per_block = 5;
71+
72+
statement ok
73+
CREATE OR REPLACE TABLE t_dim(key INT NOT NULL);
74+
75+
statement ok
76+
INSERT INTO t_data VALUES (100,(1,'a')),(101,(2,'b')),(102,(3,'c')),(103,(4,'d')),(104,(5,'e')),(105,(6,'f')),(106,(7,'g')),(107,(8,'h')),(108,(9,'i')),(109,(10,'j'));
77+
78+
statement ok
79+
INSERT INTO t_dim VALUES (1),(2),(3);
80+
81+
query I
82+
SELECT count() FROM t_dim AS d INNER JOIN (SELECT t:key AS key FROM t_data) AS p ON d.key = p.key;
83+
----
84+
3
85+
86+
statement ok
87+
DROP TABLE t_data;
88+
89+
statement ok
90+
DROP TABLE t_dim;

0 commit comments

Comments
 (0)