Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e2b5e5c
feat: enhance schema alignment and recursion handling
kosiew May 5, 2026
f191f7c
feat(datafusion): add direct tests for align_plan_to_schema and docum…
kosiew May 6, 2026
1b39805
feat: improve schema alignment checks in execution plans
kosiew May 6, 2026
c0a6066
feat: Improve performance and clarity in common and recursive query m…
kosiew May 6, 2026
7fa72c2
feat: improve test setup and simplify validation in physical plan
kosiew May 6, 2026
4959978
feat: enhance recursive query functionality with new schema handling …
kosiew May 10, 2026
0b35795
fix: optimize recursive CTE handling to prevent SLT hang
kosiew May 10, 2026
0bdf95c
feat: refactor CTE handling and logical plan simplifications
kosiew May 10, 2026
63f62a8
feat: enhance recursive query validation and testing
kosiew May 10, 2026
8b4786f
feat: enhance recursive CTE handling in DataFusion
kosiew May 11, 2026
07112de
feat(datafusion): enhance recursive query handling and error management
kosiew May 11, 2026
4c17d7a
feat: update recursive_query and physical_planner to use references
kosiew May 11, 2026
365b9ca
Revert to 63f62a829: feat: enhance recursive query validation and tes…
kosiew May 11, 2026
973f93e
feat: enhance recursive query handling by aligning schemas and preser…
kosiew May 11, 2026
ec9aafd
feat: enhance documentation for name preservation and nullability rat…
kosiew May 11, 2026
59e8d92
Revert to 739e1471b: Add reusable plan-time schema alignment helper a…
kosiew May 13, 2026
1c990d7
Merge branch 'main' into nullability-mismatch-22034
kosiew May 13, 2026
02e1abb
feat: add SLT repro for recursive CTE and update nullability handling
kosiew May 13, 2026
869e49a
feat: refactor recursive query handling and nullability management
kosiew May 13, 2026
3389785
fix: update explain_tree.slt to reflect correct type casting in proje…
kosiew May 13, 2026
6953076
fix: correct SUM(0) -> 0 as level in recursive CTE query
kosiew May 13, 2026
4b6f9fa
feat: update reconcile_recursive_query_input_nullability to handle on…
kosiew May 13, 2026
e76aa54
feat: implement central recursive CTE schema helpers
kosiew May 13, 2026
18b06b0
fix: update type casting in projection for explain_analyze test
kosiew May 13, 2026
aa7fb40
feat: update TreeNode::exists usage and optimize CTE handling
kosiew May 13, 2026
7606c81
feat: enhance RecursiveQuery schema handling and add regressions
kosiew May 21, 2026
a204864
fix: amend RecursiveQuery structure and schema handling
kosiew May 21, 2026
ec1c4ca
feat(tests): add edge cases for SQL logic tests in cte.slt
kosiew May 21, 2026
e11d8fa
feat: improve query planning and schema handling
kosiew May 22, 2026
e5920d4
feat: Refactor RecursiveQuery to always rebuild through LogicalPlanBu…
kosiew May 22, 2026
e3b50d4
feat(tests): add edge cases to SLT tests for CTEs
kosiew May 22, 2026
f6d790b
Merge branch 'main' into nullability-mismatch-22034
kosiew May 22, 2026
6bfcfe9
fix: remove RecursiveQuery from datafusion_expr use list in mod.rs
kosiew May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 59 additions & 6 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,14 @@ impl LogicalPlanBuilder {
// Ensure that the recursive term has the same field types as the static term
let coerced_recursive_term =
coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
name,
static_term: self.plan,
recursive_term: Arc::new(coerced_recursive_term),
is_distinct,
})))
Ok(Self::from(LogicalPlan::RecursiveQuery(
RecursiveQuery::try_new(
name,
self.plan,
Arc::new(coerced_recursive_term),
is_distinct,
)?,
)))
}

/// Create a values list based relation, and the schema is inferred from data, consuming
Expand Down Expand Up @@ -2289,6 +2291,7 @@ pub fn unnest_with_options(

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::vec;

use super::*;
Expand Down Expand Up @@ -2367,6 +2370,56 @@ mod tests {
Ok(())
}

#[test]
fn recursive_query_schema_widens_nullability_from_recursive_term() -> Result<()> {
let static_term =
LogicalPlanBuilder::empty(true).project(vec![lit(0i32).alias("n")])?;
let recursive_term = LogicalPlanBuilder::empty(true)
.project(vec![lit(ScalarValue::Int32(None)).alias("recursive_n")])?
.build()?;

let plan = static_term
.to_recursive_query("t".to_string(), recursive_term, false)?
.build()?;

assert_eq!(plan.schema().field(0).name(), "n");
assert!(plan.schema().field(0).is_nullable());
Ok(())
}

#[test]
fn recursive_query_schema_preserves_static_metadata() -> Result<()> {
let static_metadata =
HashMap::from([("source".to_string(), "static".to_string())]);
let recursive_metadata =
HashMap::from([("source".to_string(), "recursive".to_string())]);
let static_schema = Schema::new_with_metadata(
vec![
Field::new("n", DataType::Int32, false)
.with_metadata(static_metadata.clone()),
],
static_metadata.clone(),
);
let recursive_schema = Schema::new_with_metadata(
vec![
Field::new("recursive_n", DataType::Int32, false)
.with_metadata(recursive_metadata),
],
HashMap::from([("source".to_string(), "recursive".to_string())]),
);

let static_term = table_scan(Some("static_t"), &static_schema, None)?;
let recursive_term =
table_scan(Some("recursive_t"), &recursive_schema, None)?.build()?;
let plan = static_term
.to_recursive_query("t".to_string(), recursive_term, false)?
.build()?;

assert_eq!(plan.schema().field(0).metadata(), &static_metadata);
assert_eq!(plan.schema().metadata(), &static_metadata);
Ok(())
}

#[test]
fn plan_builder_union() -> Result<()> {
let plan =
Expand Down
187 changes: 179 additions & 8 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ impl LogicalPlan {
LogicalPlan::Ddl(ddl) => ddl.schema(),
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
// we take the schema of the static term as the schema of the entire recursive query
static_term.schema()
}
}
Expand Down Expand Up @@ -1080,12 +1079,12 @@ impl LogicalPlan {
}) => {
self.assert_no_expressions(expr)?;
let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
name: name.clone(),
static_term: Arc::new(static_term),
recursive_term: Arc::new(recursive_term),
is_distinct: *is_distinct,
}))
Ok(LogicalPlan::RecursiveQuery(RecursiveQuery::try_new(
name.clone(),
Arc::new(static_term),
Arc::new(recursive_term),
*is_distinct,
)?))
}
LogicalPlan::Analyze(a) => {
self.assert_no_expressions(expr)?;
Expand Down Expand Up @@ -2246,7 +2245,7 @@ impl PartialOrd for EmptyRelation {
/// intermediate table, then empty the intermediate table.
///
/// [Postgres Docs]: https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RecursiveQuery {
/// Name of the query
pub name: String,
Expand All @@ -2260,6 +2259,112 @@ pub struct RecursiveQuery {
pub is_distinct: bool,
}

impl RecursiveQuery {
/// Create a recursive query with an output schema using static term field names
/// and nullability widened across both static and recursive terms.
pub fn try_new(
name: String,
static_term: Arc<LogicalPlan>,
recursive_term: Arc<LogicalPlan>,
is_distinct: bool,
) -> Result<Self> {
let schema =
recursive_query_schema(static_term.schema(), recursive_term.schema())?;
let static_term = align_logical_plan_to_schema(static_term, schema)?;
Ok(Self {
name,
static_term,
recursive_term,
is_distinct,
})
}
}

fn align_logical_plan_to_schema(
input: Arc<LogicalPlan>,
schema: DFSchemaRef,
) -> Result<Arc<LogicalPlan>> {
if input.schema().as_ref() == schema.as_ref() {
return Ok(input);
}

let expr = input
.schema()
.fields()
.iter()
.enumerate()
.map(|(i, _)| Expr::Column(Column::from(input.schema().qualified_field(i))))
.collect();
Ok(Arc::new(LogicalPlan::Projection(
Projection::try_new_with_schema(expr, input, schema)?,
)))
}

fn recursive_query_schema(
static_schema: &DFSchema,
recursive_schema: &DFSchema,
) -> Result<DFSchemaRef> {
if static_schema.fields().len() != recursive_schema.fields().len() {
return plan_err!(
"RecursiveQuery static and recursive terms have different number of columns: {} != {}",
static_schema.fields().len(),
recursive_schema.fields().len()
);
}

let fields = static_schema
.fields()
.iter()
.zip(recursive_schema.fields().iter())
.enumerate()
.map(|(i, (static_field, recursive_field))| {
if static_field.data_type() != recursive_field.data_type() {
return plan_err!(
"RecursiveQuery column {i} has different types: static term has {} whereas recursive term has {}",
static_field.data_type(),
recursive_field.data_type()
);
}
// Field names and qualifiers always come from the static/anchor term so
// that recursive-term column names never leak into the declared CTE schema.
let (qualifier, _) = static_schema.qualified_field(i);
let field = Field::new(
static_field.name(),
static_field.data_type().clone(),
// Nullability is widened (union-like) across both terms so that a
// nullable recursive expression does not force a runtime error when
// the anchor is non-nullable (e.g. `SELECT 0 AS level`).
static_field.is_nullable() || recursive_field.is_nullable(),
)
.with_metadata(static_field.metadata().clone());
Ok((qualifier.cloned(), Arc::new(field)))
})
.collect::<Result<Vec<_>>>()?;

Ok(Arc::new(DFSchema::new_with_metadata(
fields,
static_schema.metadata().clone(),
)?))
}

impl PartialOrd for RecursiveQuery {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
(
&self.name,
&self.static_term,
&self.recursive_term,
self.is_distinct,
)
.partial_cmp(&(
&other.name,
&other.static_term,
&other.recursive_term,
other.is_distinct,
))
.filter(|cmp| *cmp != Ordering::Equal || self == other)
}
}

/// Values expression. See
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
Expand Down Expand Up @@ -4871,6 +4976,72 @@ mod tests {
);
}

fn empty_plan_with_fields(fields: Vec<Field>) -> Arc<LogicalPlan> {
Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(
DFSchema::from_unqualified_fields(fields.into(), HashMap::new()).unwrap(),
),
}))
}

#[test]
fn recursive_query_try_new_aligns_static_term_to_widened_schema() -> Result<()> {
let static_term =
empty_plan_with_fields(vec![Field::new("a", DataType::Int32, false)]);
let recursive_term =
empty_plan_with_fields(vec![Field::new("b", DataType::Int32, true)]);

let query = RecursiveQuery::try_new(
"t".to_string(),
Arc::clone(&static_term),
Arc::clone(&recursive_term),
false,
)?;

assert_eq!(query.static_term.schema().field(0).name(), "a");
assert!(query.static_term.schema().field(0).is_nullable());
assert!(matches!(
query.static_term.as_ref(),
LogicalPlan::Projection(_)
));
assert!(
Arc::ptr_eq(&query.recursive_term, &recursive_term),
"recursive term should not be wrapped in a schema-only Projection"
);
Ok(())
}

#[test]
fn recursive_query_try_new_rejects_mismatched_column_count() {
let static_term =
empty_plan_with_fields(vec![Field::new("a", DataType::Int32, false)]);
let recursive_term = empty_plan_with_fields(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);

let err =
RecursiveQuery::try_new("t".to_string(), static_term, recursive_term, false)
.unwrap_err();

assert_snapshot!(err.strip_backtrace(), @"Error during planning: RecursiveQuery static and recursive terms have different number of columns: 1 != 2");
}

#[test]
fn recursive_query_try_new_rejects_mismatched_types() {
let static_term =
empty_plan_with_fields(vec![Field::new("a", DataType::Int32, false)]);
let recursive_term =
empty_plan_with_fields(vec![Field::new("a", DataType::Int64, false)]);

let err =
RecursiveQuery::try_new("t".to_string(), static_term, recursive_term, false)
.unwrap_err();

assert_snapshot!(err.strip_backtrace(), @"Error during planning: RecursiveQuery column 0 has different types: static term has Int32 whereas recursive term has Int64");
}

#[test]
fn test_partial_eq_hash_and_partial_ord() {
let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
Expand Down
10 changes: 6 additions & 4 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,16 +329,18 @@ impl TreeNode for LogicalPlan {
static_term,
recursive_term,
is_distinct,
}) => (static_term, recursive_term).map_elements(f)?.update_data(
..
}) => (static_term, recursive_term).map_elements(f)?.map_data(
|(static_term, recursive_term)| {
LogicalPlan::RecursiveQuery(RecursiveQuery {
RecursiveQuery::try_new(
name,
static_term,
recursive_term,
is_distinct,
})
)
.map(LogicalPlan::RecursiveQuery)
},
),
)?,
LogicalPlan::Statement(stmt) => match stmt {
Statement::Prepare(p) => p
.input
Expand Down
Loading
Loading