Skip to content

Commit e7be9bb

Browse files
committed
fix(unparser): Fix column alias rewriting for Filter nodes preserved by Inexact filter pushdown
1 parent bc2b36c commit e7be9bb

3 files changed

Lines changed: 366 additions & 1 deletion

File tree

datafusion/sql/src/unparser/plan.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,6 +1176,43 @@ impl Unparser<'_> {
11761176
}
11771177
Ok(ret)
11781178
}
1179+
// Handle Filter between SubqueryAlias and TableScan (e.g. Inexact filter pushdown) or
1180+
// manually created plan. Rewrite predicate column references to use the alias.
1181+
// Skip predicates with subquery expressions — TableAliasRewriter
1182+
// cannot rewrite OuterReferenceColumn inside subquery LogicalPlans.
1183+
// Returning None lets the caller wrap the plan as a derived table,
1184+
// preserving the original table name for outer references and generate correct SQL.
1185+
LogicalPlan::Filter(filter) => {
1186+
if filter.predicate.exists(|e| {
1187+
Ok(matches!(
1188+
e,
1189+
Expr::Exists(_) | Expr::InSubquery(_) | Expr::ScalarSubquery(_)
1190+
))
1191+
})? {
1192+
return Ok(None);
1193+
}
1194+
1195+
if let Some(plan) = self.unparse_table_scan_pushdown(
1196+
&filter.input,
1197+
alias.clone(),
1198+
already_projected,
1199+
)? {
1200+
let predicate = if let Some(ref alias_name) = alias {
1201+
let mut rewriter = TableAliasRewriter {
1202+
table_schema: plan.schema().as_arrow(),
1203+
alias_name: alias_name.clone(),
1204+
};
1205+
filter.predicate.clone().rewrite(&mut rewriter).data()?
1206+
} else {
1207+
filter.predicate.clone()
1208+
};
1209+
Ok(Some(
1210+
LogicalPlanBuilder::from(plan).filter(predicate)?.build()?,
1211+
))
1212+
} else {
1213+
Ok(None)
1214+
}
1215+
}
11791216
// SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
11801217
// The inner table scan could be a scan with pushdown operations.
11811218
LogicalPlan::Projection(projection) => {

datafusion/sql/src/unparser/utils.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,16 @@ pub(crate) fn try_transform_to_simple_table_scan_with_filters(
365365
alias_name: alias_name.clone(),
366366
});
367367

368+
// Rewrite already-collected Filter node predicates to use the
369+
// table alias so they can be properly deduplicated against the
370+
// rewritten TableScan filters below.
371+
if let Some(ref mut rewriter) = filter_alias_rewriter {
372+
filters = filters
373+
.into_iter()
374+
.map(|expr| expr.rewrite(rewriter).data())
375+
.collect::<Result<IndexSet<_>, _>>()?;
376+
}
377+
368378
// rewrite filters to use table alias if present
369379
let table_scan_filters = table_scan
370380
.filters

datafusion/sql/tests/cases/plan_to_sql.rs

Lines changed: 319 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use datafusion_expr::test::function_stub::{
2828
use datafusion_expr::{
2929
EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, Union,
3030
UserDefinedLogicalNode, UserDefinedLogicalNodeCore, WindowFrame,
31-
WindowFunctionDefinition, cast, col, lit, table_scan, wildcard,
31+
WindowFunctionDefinition, cast, col, lit, not_exists, table_scan, wildcard,
3232
};
3333
use datafusion_functions::unicode;
3434
use datafusion_functions_aggregate::grouping::grouping_udaf;
@@ -2904,3 +2904,321 @@ fn test_json_access_3() {
29042904
@r#"SELECT (j1.j1_string : 'field.inner1[''inner2'']') FROM j1"#
29052905
);
29062906
}
2907+
2908+
/// Verifies that `SubqueryAlias` wrapping a `Filter` over a `TableScan` with
2909+
/// pushdown filters correctly rewrites column references to the alias name.
2910+
///
2911+
/// Three scenarios are covered:
2912+
///
2913+
/// 1. **Inexact pushdown** — the same predicate appears in both the `TableScan`
2914+
/// filters and the `Filter` node (the optimizer keeps the `Filter` for
2915+
/// re-checking):
2916+
/// - SubqueryAlias: n1 / Filter: nation.n_name = 'FRANCE' / TableScan: nation filters=[nation.n_name = 'FRANCE']
2917+
///
2918+
/// 2. **Mixed pushdown** — the `Filter` node contains additional predicates
2919+
/// beyond what was pushed into the `TableScan` (e.g. some filters returned
2920+
/// `Unsupported` from `supports_filters_pushdown`):
2921+
/// - SubqueryAlias: n1 / Filter: nation.n_name = 'FRANCE' AND nation.n_nationkey > 10 / TableScan: nation filters=[nation.n_name = 'FRANCE']
2922+
///
2923+
/// 3. **Disjoint predicates** — the `Filter` and `TableScan` have completely
2924+
/// different predicates (no duplicates):
2925+
/// - SubqueryAlias: n1 / Filter: nation.n_nationkey > 10 / TableScan: nation filters=[nation.n_name = 'FRANCE']
2926+
#[test]
2927+
fn test_subquery_alias_with_filter_over_table_scan_pushdown() -> Result<()> {
2928+
let schema = Schema::new(vec![
2929+
Field::new("n_nationkey", DataType::Int32, false),
2930+
Field::new("n_name", DataType::Utf8, false),
2931+
]);
2932+
2933+
// Scenario 1: Inexact pushdown — same predicate in both TableScan and Filter
2934+
let scan = table_scan_with_filters(
2935+
Some("nation"),
2936+
&schema,
2937+
Some(vec![0, 1]),
2938+
vec![col("n_name").eq(lit("FRANCE"))],
2939+
)?
2940+
.build()?;
2941+
2942+
let filtered = LogicalPlanBuilder::from(scan)
2943+
.filter(col("nation.n_name").eq(lit("FRANCE")))?
2944+
.build()?;
2945+
2946+
let aliased = LogicalPlanBuilder::from(filtered).alias("n1")?.build()?;
2947+
2948+
let sql = plan_to_sql(&aliased)?;
2949+
// Duplicate filter is expected: both the Filter node and TableScan carry
2950+
// the same predicate for Inexact pushdown. Deduplicating filters that may
2951+
// be duplicated by Inexact/partial pushdown does not affect query
2952+
// correctness and can be optimized separately.
2953+
assert_snapshot!(
2954+
sql,
2955+
@"SELECT n1.n_nationkey, n1.n_name FROM nation AS n1 WHERE (n1.n_name = 'FRANCE') AND (n1.n_name = 'FRANCE')"
2956+
);
2957+
2958+
// Scenario 2: Mixed pushdown — Filter has an additional predicate not in TableScan
2959+
let scan = table_scan_with_filters(
2960+
Some("nation"),
2961+
&schema,
2962+
Some(vec![0, 1]),
2963+
vec![col("n_name").eq(lit("FRANCE"))],
2964+
)?
2965+
.build()?;
2966+
2967+
let filtered = LogicalPlanBuilder::from(scan)
2968+
.filter(
2969+
col("nation.n_name")
2970+
.eq(lit("FRANCE"))
2971+
.and(col("nation.n_nationkey").gt(lit(10))),
2972+
)?
2973+
.build()?;
2974+
2975+
let aliased = LogicalPlanBuilder::from(filtered).alias("n1")?.build()?;
2976+
2977+
let sql = plan_to_sql(&aliased)?;
2978+
// The `n_name = 'FRANCE'` predicate appears twice: once from the Filter
2979+
// node (which also carries the extra `n_nationkey > 10`) and once from
2980+
// the TableScan pushdown filters. This is correct but redundant — dedup
2981+
// for Inexact/partial pushdown duplicates can be optimized separately.
2982+
assert_snapshot!(
2983+
sql,
2984+
@"SELECT n1.n_nationkey, n1.n_name FROM nation AS n1 WHERE ((n1.n_name = 'FRANCE') AND (n1.n_nationkey > 10)) AND (n1.n_name = 'FRANCE')"
2985+
);
2986+
2987+
// Scenario 3: Disjoint predicates — Filter and TableScan have different predicates
2988+
let scan = table_scan_with_filters(
2989+
Some("nation"),
2990+
&schema,
2991+
Some(vec![0, 1]),
2992+
vec![col("n_name").eq(lit("FRANCE"))],
2993+
)?
2994+
.build()?;
2995+
2996+
let filtered = LogicalPlanBuilder::from(scan)
2997+
.filter(col("nation.n_nationkey").gt(lit(10)))?
2998+
.build()?;
2999+
3000+
let aliased = LogicalPlanBuilder::from(filtered).alias("n1")?.build()?;
3001+
3002+
let sql = plan_to_sql(&aliased)?;
3003+
// No duplicate: Filter predicate differs from TableScan pushdown filter.
3004+
assert_snapshot!(
3005+
sql,
3006+
@"SELECT n1.n_nationkey, n1.n_name FROM nation AS n1 WHERE (n1.n_nationkey > 10) AND (n1.n_name = 'FRANCE')"
3007+
);
3008+
3009+
Ok(())
3010+
}
3011+
3012+
/// Verifies that a `Filter` node above a `TableScan` with pushdown filters
3013+
/// (without a `SubqueryAlias`) is handled correctly by the SQL conversion path.
3014+
#[test]
3015+
fn test_filter_over_table_scan_pushdown_no_alias() -> Result<()> {
3016+
let schema = Schema::new(vec![
3017+
Field::new("n_nationkey", DataType::Int32, false),
3018+
Field::new("n_name", DataType::Utf8, false),
3019+
]);
3020+
3021+
// Same predicate in both TableScan and Filter (Inexact pushdown, no alias)
3022+
let scan = table_scan_with_filters(
3023+
Some("nation"),
3024+
&schema,
3025+
Some(vec![0, 1]),
3026+
vec![col("n_name").eq(lit("FRANCE"))],
3027+
)?
3028+
.build()?;
3029+
3030+
let filtered = LogicalPlanBuilder::from(scan)
3031+
.filter(col("nation.n_name").eq(lit("FRANCE")))?
3032+
.build()?;
3033+
3034+
let sql = plan_to_sql(&filtered)?;
3035+
// Duplicate filter: same predicate in Filter node and TableScan pushdown.
3036+
// Dedup for Inexact/partial pushdown duplicates does not affect correctness
3037+
// and can be optimized separately.
3038+
assert_snapshot!(
3039+
sql,
3040+
@"SELECT nation.n_nationkey, nation.n_name FROM nation WHERE (nation.n_name = 'FRANCE') AND (nation.n_name = 'FRANCE')"
3041+
);
3042+
3043+
// Filter has additional predicate beyond what was pushed down
3044+
let scan = table_scan_with_filters(
3045+
Some("nation"),
3046+
&schema,
3047+
Some(vec![0, 1]),
3048+
vec![col("n_name").eq(lit("FRANCE"))],
3049+
)?
3050+
.build()?;
3051+
3052+
let filtered = LogicalPlanBuilder::from(scan)
3053+
.filter(
3054+
col("nation.n_name")
3055+
.eq(lit("FRANCE"))
3056+
.and(col("nation.n_nationkey").gt(lit(10))),
3057+
)?
3058+
.build()?;
3059+
3060+
let sql = plan_to_sql(&filtered)?;
3061+
// `n_name = 'FRANCE'` appears in both Filter and TableScan — redundant
3062+
// but correct. Dedup can be optimized separately.
3063+
assert_snapshot!(
3064+
sql,
3065+
@"SELECT nation.n_nationkey, nation.n_name FROM nation WHERE ((nation.n_name = 'FRANCE') AND (nation.n_nationkey > 10)) AND (nation.n_name = 'FRANCE')"
3066+
);
3067+
3068+
// Disjoint predicates — Filter and TableScan have different predicates
3069+
let scan = table_scan_with_filters(
3070+
Some("nation"),
3071+
&schema,
3072+
Some(vec![0, 1]),
3073+
vec![col("n_name").eq(lit("FRANCE"))],
3074+
)?
3075+
.build()?;
3076+
3077+
let filtered = LogicalPlanBuilder::from(scan)
3078+
.filter(col("nation.n_nationkey").gt(lit(10)))?
3079+
.build()?;
3080+
3081+
let sql = plan_to_sql(&filtered)?;
3082+
// No duplicate: Filter predicate differs from TableScan pushdown filter.
3083+
assert_snapshot!(
3084+
sql,
3085+
@"SELECT nation.n_nationkey, nation.n_name FROM nation WHERE (nation.n_nationkey > 10) AND (nation.n_name = 'FRANCE')"
3086+
);
3087+
3088+
Ok(())
3089+
}
3090+
3091+
/// Verifies that when `SubqueryAlias(Filter(TableScan))` is used inside a JOIN,
3092+
/// `try_transform_to_simple_table_scan_with_filters` rewrites the Filter
3093+
/// predicate with the alias so `nation.n_name` becomes `n1.n_name` and the
3094+
/// duplicate filter is properly deduplicated.
3095+
#[test]
3096+
fn test_join_with_filter_over_aliased_table_scan_pushdown() -> Result<()> {
3097+
let nation_schema = Schema::new(vec![
3098+
Field::new("n_nationkey", DataType::Int32, false),
3099+
Field::new("n_name", DataType::Utf8, false),
3100+
]);
3101+
3102+
let supplier_schema = Schema::new(vec![
3103+
Field::new("s_suppkey", DataType::Int32, false),
3104+
Field::new("s_nationkey", DataType::Int32, false),
3105+
]);
3106+
3107+
let supplier_scan =
3108+
table_scan(Some("supplier"), &supplier_schema, Some(vec![0, 1]))?.build()?;
3109+
3110+
// Build: SubqueryAlias(n1, Filter(nation.n_name IN ('FRANCE','GERMANY'),
3111+
// TableScan(nation, partial_filters=[same])))
3112+
let nation_scan = table_scan_with_filters(
3113+
Some("nation"),
3114+
&nation_schema,
3115+
Some(vec![0, 1]),
3116+
vec![
3117+
col("n_name")
3118+
.eq(lit("FRANCE"))
3119+
.or(col("n_name").eq(lit("GERMANY"))),
3120+
],
3121+
)?
3122+
.build()?;
3123+
3124+
let nation_filtered = LogicalPlanBuilder::from(nation_scan)
3125+
.filter(
3126+
col("nation.n_name")
3127+
.eq(lit("FRANCE"))
3128+
.or(col("nation.n_name").eq(lit("GERMANY"))),
3129+
)?
3130+
.build()?;
3131+
3132+
let nation_aliased = LogicalPlanBuilder::from(nation_filtered)
3133+
.alias("n1")?
3134+
.build()?;
3135+
3136+
let join_plan = LogicalPlanBuilder::from(supplier_scan)
3137+
.join(
3138+
nation_aliased,
3139+
datafusion_expr::JoinType::Inner,
3140+
(vec!["supplier.s_nationkey"], vec!["n1.n_nationkey"]),
3141+
None,
3142+
)?
3143+
.build()?;
3144+
3145+
let sql = plan_to_sql(&join_plan)?;
3146+
// The filter predicate should use alias n1 (not original table name "nation").
3147+
// With Inexact pushdown, the Filter and TableScan carry the same predicate;
3148+
// after alias rewriting both become identical and are deduplicated.
3149+
assert_snapshot!(
3150+
sql,
3151+
@r#"SELECT supplier.s_suppkey, supplier.s_nationkey, n1.n_nationkey, n1.n_name FROM supplier INNER JOIN nation AS n1 ON supplier.s_nationkey = n1.n_nationkey AND ((n1.n_name = 'FRANCE') OR (n1.n_name = 'GERMANY'))"#
3152+
);
3153+
3154+
Ok(())
3155+
}
3156+
3157+
/// Verifies that when a `Filter` predicate contains subquery expressions
3158+
/// (e.g. `NOT EXISTS`), `unparse_table_scan_pushdown` returns `None` so the
3159+
/// caller's `SubqueryAlias` handler falls back to wrapping the inner plan as a
3160+
/// derived table: `(SELECT ... FROM table WHERE ...) AS alias`.
3161+
///
3162+
/// This preserves the original table name inside the derived table, which is
3163+
/// required for `OuterReferenceColumn` expressions (e.g. `outer_ref(customer.c_custkey)`)
3164+
/// that refer to the original table name rather than the alias.
3165+
#[test]
3166+
fn test_filter_with_subquery_over_aliased_table_scan_pushdown() -> Result<()> {
3167+
let customer_schema = Schema::new(vec![
3168+
Field::new("c_custkey", DataType::Int32, false),
3169+
Field::new("c_phone", DataType::Utf8, false),
3170+
Field::new("c_acctbal", DataType::Float64, false),
3171+
]);
3172+
3173+
let orders_schema = Schema::new(vec![
3174+
Field::new("o_orderkey", DataType::Int32, false),
3175+
Field::new("o_custkey", DataType::Int32, false),
3176+
]);
3177+
3178+
// Build a NOT EXISTS subquery:
3179+
// NOT EXISTS (SELECT o_orderkey, o_custkey FROM orders
3180+
// WHERE orders.o_custkey = outer_ref(customer.c_custkey))
3181+
let orders_scan =
3182+
table_scan(Some("orders"), &orders_schema, Some(vec![0, 1]))?.build()?;
3183+
3184+
let subquery_filter = LogicalPlanBuilder::from(orders_scan)
3185+
.filter(col("orders.o_custkey").eq(Expr::OuterReferenceColumn(
3186+
Arc::new(Field::new("c_custkey", DataType::Int32, false)),
3187+
Column::new(Some("customer"), "c_custkey"),
3188+
)))?
3189+
.build()?;
3190+
3191+
let not_exists_expr = not_exists(Arc::new(subquery_filter));
3192+
3193+
// Build the plan mirroring Q22 structure:
3194+
// SubqueryAlias(custsale,
3195+
// Projection(c_custkey, c_acctbal),
3196+
// Filter(c_acctbal > 0 AND NOT EXISTS(...),
3197+
// TableScan(customer, partial_filters=[c_acctbal > 0])))
3198+
let customer_scan = table_scan_with_filters(
3199+
Some("customer"),
3200+
&customer_schema,
3201+
Some(vec![0, 1, 2]),
3202+
vec![col("c_acctbal").gt(lit(0.0))],
3203+
)?
3204+
.build()?;
3205+
3206+
let filtered = LogicalPlanBuilder::from(customer_scan)
3207+
.filter(col("customer.c_acctbal").gt(lit(0.0)).and(not_exists_expr))?
3208+
.project(vec![col("customer.c_custkey"), col("customer.c_acctbal")])?
3209+
.build()?;
3210+
3211+
let aliased = LogicalPlanBuilder::from(filtered)
3212+
.alias("custsale")?
3213+
.build()?;
3214+
3215+
let sql = plan_to_sql(&aliased)?;
3216+
// The subquery produces a derived table that preserves the original "customer" table name,
3217+
// so outer_ref(customer.c_custkey) remains valid inside the NOT EXISTS subquery.
3218+
assert_snapshot!(
3219+
sql,
3220+
@"SELECT * FROM (SELECT customer.c_custkey, customer.c_acctbal FROM customer WHERE ((customer.c_acctbal > 0.0) AND NOT EXISTS (SELECT orders.o_orderkey, orders.o_custkey FROM orders WHERE (orders.o_custkey = customer.c_custkey))) AND (customer.c_acctbal > 0.0)) AS custsale"
3221+
);
3222+
3223+
Ok(())
3224+
}

0 commit comments

Comments
 (0)