Skip to content

Commit 33bcfcd

Browse files
committed
.
1 parent cab69a1 commit 33bcfcd

12 files changed

Lines changed: 426 additions & 69 deletions

File tree

datafusion/catalog/src/cte_worktable.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,16 @@ use crate::{ScanArgs, ScanResult, Session, TableProvider};
3636
pub struct CteWorkTable {
3737
/// The name of the CTE work table
3838
name: String,
39-
/// This schema must be shared across both the static and recursive terms of a recursive query
39+
/// Schema exposed by recursive self-references while planning the recursive term.
40+
///
41+
/// This is a conservative work-table schema, not the final recursive query output
42+
/// schema. For example, the SQL planner may mark fields nullable here so recursive
43+
/// references do not inherit unsound anchor-term nullability assumptions.
4044
table_schema: SchemaRef,
4145
}
4246

4347
impl CteWorkTable {
44-
/// construct a new CteWorkTable with the given name and schema
45-
/// This schema must match the schema of the recursive term of the query
46-
/// Since the scan method will contain an physical plan that assumes this schema
48+
/// Construct a new CteWorkTable with the given name and self-reference schema.
4749
pub fn new(name: &str, table_schema: SchemaRef) -> Self {
4850
Self {
4951
name: name.to_owned(),
@@ -56,7 +58,7 @@ impl CteWorkTable {
5658
&self.name
5759
}
5860

59-
/// The schema of the recursive term of the query
61+
/// The schema exposed by scans of the recursive self-reference.
6062
pub fn schema(&self) -> SchemaRef {
6163
Arc::clone(&self.table_schema)
6264
}

datafusion/core/src/physical_planner.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1785,11 +1785,15 @@ impl DefaultPhysicalPlanner {
17851785
}
17861786
}
17871787
LogicalPlan::RecursiveQuery(RecursiveQuery {
1788-
name, is_distinct, ..
1788+
name,
1789+
is_distinct,
1790+
schema,
1791+
..
17891792
}) => {
17901793
let [static_term, recursive_term] = children.two()?;
17911794
Arc::new(RecursiveQueryExec::try_new(
17921795
name.clone(),
1796+
Arc::clone(schema.inner()),
17931797
static_term,
17941798
recursive_term,
17951799
*is_distinct,

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1014,7 +1014,7 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> {
10141014
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
10151015
RecursiveQueryExec: name=number_series, is_distinct=false
10161016
CoalescePartitionsExec
1017-
ProjectionExec: expr=[id@0 as id, 1 as level]
1017+
ProjectionExec: expr=[CAST(id@0 AS Int64) as id, CAST(1 AS Int64) as level]
10181018
FilterExec: id@0 = 1
10191019
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
10201020
DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,12 +192,13 @@ impl LogicalPlanBuilder {
192192
// Ensure that the recursive term has the same field types as the static term
193193
let coerced_recursive_term =
194194
coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
195-
Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
195+
let recursive_query = RecursiveQuery::try_new(
196196
name,
197-
static_term: self.plan,
198-
recursive_term: Arc::new(coerced_recursive_term),
197+
self.plan,
198+
Arc::new(coerced_recursive_term),
199199
is_distinct,
200-
})))
200+
)?;
201+
Ok(Self::from(LogicalPlan::RecursiveQuery(recursive_query)))
201202
}
202203

203204
/// Create a values list based relation, and the schema is inferred from data, consuming

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 169 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -354,10 +354,7 @@ impl LogicalPlan {
354354
LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
355355
LogicalPlan::Ddl(ddl) => ddl.schema(),
356356
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
357-
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
358-
// we take the schema of the static term as the schema of the entire recursive query
359-
static_term.schema()
360-
}
357+
LogicalPlan::RecursiveQuery(RecursiveQuery { schema, .. }) => schema,
361358
}
362359
}
363360

@@ -741,7 +738,14 @@ impl LogicalPlan {
741738
};
742739
Ok(LogicalPlan::Distinct(distinct))
743740
}
744-
LogicalPlan::RecursiveQuery(_) => Ok(self),
741+
LogicalPlan::RecursiveQuery(RecursiveQuery {
742+
name,
743+
static_term,
744+
recursive_term,
745+
is_distinct,
746+
schema: _,
747+
}) => RecursiveQuery::try_new(name, static_term, recursive_term, is_distinct)
748+
.map(LogicalPlan::RecursiveQuery),
745749
LogicalPlan::Analyze(_) => Ok(self),
746750
LogicalPlan::Explain(_) => Ok(self),
747751
LogicalPlan::TableScan(_) => Ok(self),
@@ -1081,12 +1085,13 @@ impl LogicalPlan {
10811085
}) => {
10821086
self.assert_no_expressions(expr)?;
10831087
let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1084-
Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1085-
name: name.clone(),
1086-
static_term: Arc::new(static_term),
1087-
recursive_term: Arc::new(recursive_term),
1088-
is_distinct: *is_distinct,
1089-
}))
1088+
RecursiveQuery::try_new(
1089+
name.clone(),
1090+
Arc::new(static_term),
1091+
Arc::new(recursive_term),
1092+
*is_distinct,
1093+
)
1094+
.map(LogicalPlan::RecursiveQuery)
10901095
}
10911096
LogicalPlan::Analyze(a) => {
10921097
self.assert_no_expressions(expr)?;
@@ -2258,7 +2263,7 @@ impl PartialOrd for EmptyRelation {
22582263
/// intermediate table, then empty the intermediate table.
22592264
///
22602265
/// [Postgres Docs]: https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE
2261-
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2266+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
22622267
pub struct RecursiveQuery {
22632268
/// Name of the query
22642269
pub name: String,
@@ -2270,6 +2275,90 @@ pub struct RecursiveQuery {
22702275
/// Should the output of the recursive term be deduplicated (`UNION`) or
22712276
/// not (`UNION ALL`).
22722277
pub is_distinct: bool,
2278+
/// Schema exposed to parent plans after reconciling the static and recursive terms.
2279+
pub schema: DFSchemaRef,
2280+
}
2281+
2282+
impl PartialOrd for RecursiveQuery {
2283+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2284+
match self.name.partial_cmp(&other.name) {
2285+
Some(Ordering::Equal) => {
2286+
match self.static_term.partial_cmp(&other.static_term) {
2287+
Some(Ordering::Equal) => {
2288+
match self.recursive_term.partial_cmp(&other.recursive_term) {
2289+
Some(Ordering::Equal) => {
2290+
self.is_distinct.partial_cmp(&other.is_distinct)
2291+
}
2292+
cmp => cmp,
2293+
}
2294+
}
2295+
cmp => cmp,
2296+
}
2297+
}
2298+
cmp => cmp,
2299+
}
2300+
// If the query definition compares equal but the derived schema differs,
2301+
// return `None` instead of contradicting `PartialEq` with `Some(Equal)`.
2302+
// TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
2303+
.filter(|cmp| *cmp != Ordering::Equal || self == other)
2304+
}
2305+
}
2306+
2307+
impl RecursiveQuery {
2308+
pub fn try_new(
2309+
name: String,
2310+
static_term: Arc<LogicalPlan>,
2311+
recursive_term: Arc<LogicalPlan>,
2312+
is_distinct: bool,
2313+
) -> Result<Self> {
2314+
let schema =
2315+
recursive_query_output_schema(static_term.schema(), recursive_term.schema())?;
2316+
Ok(Self {
2317+
name,
2318+
static_term,
2319+
recursive_term,
2320+
is_distinct,
2321+
schema,
2322+
})
2323+
}
2324+
}
2325+
2326+
/// Compute a recursive query's output schema by considering both its static and
2327+
/// recursive terms.
2328+
///
2329+
/// Field names, types, and metadata come from the static term. A field is
2330+
/// nullable if either the static or the recursive term produces a nullable
2331+
/// value in that position, matching how `UNION` reconciles branch nullability.
2332+
///
2333+
/// Functional dependencies are intentionally dropped: the recursive term
2334+
/// appends rows that can duplicate values the static term guarantees unique, so
2335+
/// any FDs carried by the static term may not hold over the combined output.
2336+
fn recursive_query_output_schema(
2337+
static_schema: &DFSchemaRef,
2338+
recursive_schema: &DFSchemaRef,
2339+
) -> Result<DFSchemaRef> {
2340+
if static_schema.fields().len() != recursive_schema.fields().len() {
2341+
return Err(DataFusionError::Plan(format!(
2342+
"Non-recursive term and recursive term must have the same number of columns ({} != {})",
2343+
static_schema.fields().len(),
2344+
recursive_schema.fields().len()
2345+
)));
2346+
}
2347+
2348+
let fields = static_schema
2349+
.iter()
2350+
.zip(recursive_schema.fields())
2351+
.map(|((qualifier, static_field), recursive_field)| {
2352+
let nullable = static_field.is_nullable() || recursive_field.is_nullable();
2353+
(
2354+
qualifier.cloned(),
2355+
static_field.as_ref().clone().with_nullable(nullable).into(),
2356+
)
2357+
})
2358+
.collect::<Vec<_>>();
2359+
2360+
DFSchema::new_with_metadata(fields, static_schema.metadata().clone())
2361+
.map(DFSchemaRef::new)
22732362
}
22742363

22752364
/// Values expression. See
@@ -4613,6 +4702,74 @@ mod tests {
46134702
.build()
46144703
}
46154704

4705+
fn recursive_term_scan(name: &str, fields: Vec<Field>) -> Result<Arc<LogicalPlan>> {
4706+
Ok(Arc::new(
4707+
table_scan(Some(name), &Schema::new(fields), None)?.build()?,
4708+
))
4709+
}
4710+
4711+
#[test]
4712+
fn recursive_query_widens_nullability_per_column() -> Result<()> {
4713+
// Column `a` is non-nullable in both terms and must stay non-nullable;
4714+
// column `b` is non-nullable in the static term but nullable in the
4715+
// recursive term, so the output must widen it to nullable.
4716+
let static_term = recursive_term_scan(
4717+
"static",
4718+
vec![
4719+
Field::new("a", DataType::Int32, false),
4720+
Field::new("b", DataType::Int32, false),
4721+
],
4722+
)?;
4723+
let recursive_term = recursive_term_scan(
4724+
"rec",
4725+
vec![
4726+
Field::new("a", DataType::Int32, false),
4727+
Field::new("b", DataType::Int32, true),
4728+
],
4729+
)?;
4730+
4731+
let query =
4732+
RecursiveQuery::try_new("t".to_string(), static_term, recursive_term, false)?;
4733+
4734+
// Names and types are taken from the static term.
4735+
assert_eq!(query.schema.field(0).name(), "a");
4736+
assert_eq!(query.schema.field(1).name(), "b");
4737+
assert_eq!(query.schema.field(0).data_type(), &DataType::Int32);
4738+
assert_eq!(query.schema.field(1).data_type(), &DataType::Int32);
4739+
// Nullability is widened independently per column.
4740+
assert!(!query.schema.field(0).is_nullable());
4741+
assert!(query.schema.field(1).is_nullable());
4742+
// `schema()` returns the widened recursive-query schema.
4743+
assert_eq!(
4744+
LogicalPlan::RecursiveQuery(query.clone()).schema(),
4745+
&query.schema
4746+
);
4747+
Ok(())
4748+
}
4749+
4750+
#[test]
4751+
fn recursive_query_rejects_column_count_mismatch() -> Result<()> {
4752+
let static_term =
4753+
recursive_term_scan("static", vec![Field::new("a", DataType::Int32, false)])?;
4754+
let recursive_term = recursive_term_scan(
4755+
"rec",
4756+
vec![
4757+
Field::new("a", DataType::Int32, false),
4758+
Field::new("b", DataType::Int32, false),
4759+
],
4760+
)?;
4761+
4762+
let err =
4763+
RecursiveQuery::try_new("t".to_string(), static_term, recursive_term, false)
4764+
.unwrap_err();
4765+
assert!(
4766+
err.strip_backtrace()
4767+
.contains("must have the same number of columns"),
4768+
"unexpected error: {err}"
4769+
);
4770+
Ok(())
4771+
}
4772+
46164773
#[test]
46174774
fn test_display_indent() -> Result<()> {
46184775
let plan = display_plan()?;

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,18 @@ impl TreeNode for LogicalPlan {
329329
static_term,
330330
recursive_term,
331331
is_distinct,
332+
schema,
332333
}) => (static_term, recursive_term).map_elements(f)?.update_data(
333334
|(static_term, recursive_term)| {
335+
// Ordinary child rewrites preserve derived schemas. Call
336+
// `LogicalPlan::recompute_schema` when child schemas should
337+
// be reconciled again.
334338
LogicalPlan::RecursiveQuery(RecursiveQuery {
335339
name,
336340
static_term,
337341
recursive_term,
338342
is_distinct,
343+
schema,
339344
})
340345
},
341346
),

datafusion/expr/src/planner.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ pub trait ContextProvider {
6161
not_impl_err!("Table Functions are not supported")
6262
}
6363

64-
/// Provides an intermediate table that is used to store the results of a CTE during execution
64+
/// Provides an intermediate table that is used to expose a recursive CTE
65+
/// self-reference during planning and execution.
6566
///
6667
/// CTE stands for "Common Table Expression"
6768
///
@@ -72,6 +73,9 @@ pub trait ContextProvider {
7273
/// of the sql crate (for example [`CteWorkTable`]).
7374
///
7475
/// The [`ContextProvider`] provides a way to "hide" this dependency.
76+
/// The schema argument is the schema to expose for scans of the recursive
77+
/// self-reference, which may be more conservative than the final recursive
78+
/// query output schema.
7579
///
7680
/// [`SqlToRel`]: https://docs.rs/datafusion/latest/datafusion/sql/planner/struct.SqlToRel.html
7781
/// [`CteWorkTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/cte_worktable/struct.CteWorkTable.html

0 commit comments

Comments
 (0)