Skip to content

Commit 38b7d8c

Browse files
authored
fix(cubesql): Don't mix filter params between subqueries in SQL push down (#10996)
1 parent a321af5 commit 38b7d8c

2 files changed

Lines changed: 113 additions & 5 deletions

File tree

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,43 @@ impl SqlQuery {
176176
index
177177
}
178178

179-
pub fn extend_values(&mut self, values: impl IntoIterator<Item = Option<String>>) {
180-
self.values.extend(values);
179+
/// Adds values of an independently generated subquery, returning the mapping from
180+
/// subquery-local value indexes to indexes in `self.values`. Placeholders in the
181+
/// subquery SQL must be rewritten with [`SqlQuery::remap_placeholders`] using this mapping.
182+
pub fn add_values(&mut self, values: impl IntoIterator<Item = Option<String>>) -> Vec<usize> {
183+
values.into_iter().map(|v| self.add_value(v)).collect()
184+
}
185+
186+
pub fn remap_placeholders(sql: &str, mapping: &[usize]) -> result::Result<String, CubeError> {
187+
static REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$(\d+)\$").unwrap());
188+
189+
let mut error = None;
190+
let replaced = REGEX.replace_all(sql, |c: &Captures<'_>| {
191+
let res = c[1]
192+
.parse::<usize>()
193+
.map_err(|e| CubeError::internal(format!("Can't parse param index: {e}")))
194+
.and_then(|index| {
195+
mapping.get(index).ok_or_else(|| {
196+
CubeError::internal(format!(
197+
"Param index {index} out of range for mapping of {} values",
198+
mapping.len()
199+
))
200+
})
201+
});
202+
match res {
203+
Ok(new_index) => format!("${new_index}$"),
204+
Err(e) => {
205+
if error.is_none() {
206+
error = Some(e);
207+
}
208+
"".to_string()
209+
}
210+
}
211+
});
212+
match error {
213+
None => Ok(replaced.to_string()),
214+
Some(e) => Err(e),
215+
}
181216
}
182217

183218
pub fn replace_sql(&mut self, sql: String) {
@@ -1254,8 +1289,11 @@ impl WrappedSelectNode {
12541289
)
12551290
.await?;
12561291

1292+
// Subquery SQL was generated independently, so its placeholders
1293+
// reference its own values and must be remapped to the combined values
12571294
let (sql_string, new_values) = subquery_sql.unpack();
1258-
sql.extend_values(new_values);
1295+
let mapping = sql.add_values(new_values);
1296+
let sql_string = SqlQuery::remap_placeholders(&sql_string, &mapping)?;
12591297
// TODO why only field 0 is a key?
12601298
let field = subquery.schema().field(0);
12611299
subqueries_sql.insert(field.qualified_name(), sql_string);
@@ -3445,8 +3483,11 @@ impl WrappedSelectNode {
34453483
Some(data_source),
34463484
)
34473485
.await?;
3486+
// Join subquery SQL was generated independently, so its placeholders
3487+
// reference its own values and must be remapped to the combined values
34483488
let (subq_sql_string, new_values) = subq_sql.sql.unpack();
3449-
sql.extend_values(new_values);
3489+
let mapping = sql.add_values(new_values);
3490+
let subq_sql_string = SqlQuery::remap_placeholders(&subq_sql_string, &mapping)?;
34503491
let subq_alias = subq_sql.from_alias;
34513492
// Expect that subq_sql.column_remapping already incorporates subq_alias/
34523493
// TODO does it?
@@ -3842,8 +3883,12 @@ impl WrappedSelectNode {
38423883
&HashMap::new(),
38433884
)?;
38443885

3886+
// Join subquery SQL was generated independently, so its placeholders
3887+
// reference its own values and must be remapped to the combined values
38453888
let (join_sql_str, new_values) = join_sql.unpack();
3846-
sql.extend_values(new_values);
3889+
let mapping = sql.add_values(new_values);
3890+
let join_sql_str = SqlQuery::remap_placeholders(&join_sql_str, &mapping)?;
3891+
let join_condition_sql = SqlQuery::remap_placeholders(&join_condition_sql, &mapping)?;
38473892
if let Some(join_column_remapping) = join_column_remapping {
38483893
if let Some(column_remapping) = column_remapping.as_mut() {
38493894
column_remapping.extend(join_column_remapping);

rust/cubesql/cubesql/src/compile/test/test_wrapper.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2018,3 +2018,66 @@ async fn test_wrapper_between() {
20182018
.sql
20192019
.contains("BETWEEN $1 AND $2"));
20202020
}
2021+
2022+
#[tokio::test]
2023+
async fn test_wrapper_subqueries_filter_params_not_mixed() {
2024+
if !Rewriter::sql_push_down_enabled() {
2025+
return;
2026+
}
2027+
init_testing_logger();
2028+
2029+
let query_plan = convert_select_to_query_plan(
2030+
// language=PostgreSQL
2031+
r#"
2032+
SELECT
2033+
customer_gender
2034+
FROM KibanaSampleDataEcommerce
2035+
WHERE
2036+
customer_gender IN (
2037+
SELECT
2038+
customer_gender
2039+
FROM KibanaSampleDataEcommerce
2040+
WHERE LOWER(notes) = 'sub_notes'
2041+
GROUP BY 1
2042+
)
2043+
AND notes IN (
2044+
SELECT
2045+
notes
2046+
FROM KibanaSampleDataEcommerce
2047+
WHERE LOWER(notes) IN ('male_notes', 'female_notes', 'other_notes')
2048+
GROUP BY 1
2049+
)
2050+
GROUP BY 1
2051+
;
2052+
"#
2053+
.to_string(),
2054+
DatabaseProtocol::PostgreSQL,
2055+
)
2056+
.await;
2057+
2058+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
2059+
println!(
2060+
"Physical plan: {}",
2061+
displayable(physical_plan.as_ref()).indent()
2062+
);
2063+
2064+
let wrapped_sql = query_plan
2065+
.as_logical_plan()
2066+
.find_cube_scan_wrapped_sql()
2067+
.wrapped_sql;
2068+
2069+
// Each subquery is generated with its own params; on merge, placeholders must be
2070+
// remapped to the combined values so params don't mix between subqueries
2071+
let placeholder_re = Regex::new(r"\$(\d+)\b").unwrap();
2072+
let params_in_sql_order = placeholder_re
2073+
.captures_iter(&wrapped_sql.sql)
2074+
.map(|c| {
2075+
let index = c[1].parse::<usize>().unwrap() - 1;
2076+
wrapped_sql.values[index].clone().unwrap()
2077+
})
2078+
.collect::<Vec<_>>();
2079+
assert_eq!(
2080+
params_in_sql_order,
2081+
vec!["sub_notes", "male_notes", "female_notes", "other_notes"]
2082+
);
2083+
}

0 commit comments

Comments
 (0)