Skip to content

Commit e5920d4

Browse files
committed
feat: Refactor RecursiveQuery to always rebuild through LogicalPlanBuilder
- Updated RecursiveQuery to always use LogicalPlanBuilder::to_recursive_query for rebuilding paths, removing the conditional fast path. - Added regression test to verify that recompute_schema() canonicalizes the recursive child schema with different metadata, ensuring stale schemas are not preserved.
1 parent e11d8fa commit e5920d4

1 file changed

Lines changed: 59 additions & 23 deletions

File tree

  • datafusion/expr/src/logical_plan

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
5454
use datafusion_common::cse::{NormalizeEq, Normalizeable};
5555
use datafusion_common::format::ExplainFormat;
5656
use datafusion_common::metadata::check_metadata_with_storage_equal;
57-
use datafusion_common::recursive_schema::recursive_query_output_schema;
5857
use datafusion_common::tree_node::{
5958
Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
6059
};
@@ -747,28 +746,13 @@ impl LogicalPlan {
747746
static_term,
748747
recursive_term,
749748
is_distinct,
750-
}) => {
751-
let output_schema = recursive_query_output_schema(
752-
static_term.schema(),
753-
recursive_term.schema(),
754-
)?;
755-
if output_schema != *static_term.schema() {
756-
LogicalPlanBuilder::from(Arc::unwrap_or_clone(static_term))
757-
.to_recursive_query(
758-
name,
759-
Arc::unwrap_or_clone(recursive_term),
760-
is_distinct,
761-
)?
762-
.build()
763-
} else {
764-
Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
765-
name,
766-
static_term,
767-
recursive_term,
768-
is_distinct,
769-
}))
770-
}
771-
}
749+
}) => LogicalPlanBuilder::from(Arc::unwrap_or_clone(static_term))
750+
.to_recursive_query(
751+
name,
752+
Arc::unwrap_or_clone(recursive_term),
753+
is_distinct,
754+
)?
755+
.build(),
772756
LogicalPlan::Analyze(_) => Ok(self),
773757
LogicalPlan::Explain(_) => Ok(self),
774758
LogicalPlan::TableScan(_) => Ok(self),
@@ -5088,6 +5072,58 @@ mod tests {
50885072
"#);
50895073
}
50905074

5075+
#[test]
5076+
fn recompute_schema_rebuilds_recursive_query_even_when_output_schema_matches() {
5077+
let static_schema = DFSchema::from_unqualified_fields(
5078+
vec![Field::new("n", DataType::Int32, false)].into(),
5079+
HashMap::new(),
5080+
)
5081+
.unwrap();
5082+
let recursive_schema = DFSchema::from_unqualified_fields(
5083+
vec![Field::new("n", DataType::Int32, false)].into(),
5084+
HashMap::from([("recursive".to_string(), "metadata".to_string())]),
5085+
)
5086+
.unwrap();
5087+
5088+
let static_term = Arc::new(LogicalPlan::Values(Values {
5089+
schema: Arc::new(static_schema.clone()),
5090+
values: vec![vec![Expr::Literal(ScalarValue::Int32(Some(0)), None)]],
5091+
}));
5092+
let recursive_term = Arc::new(LogicalPlan::Values(Values {
5093+
schema: Arc::new(recursive_schema),
5094+
values: vec![vec![Expr::Literal(ScalarValue::Int32(Some(1)), None)]],
5095+
}));
5096+
5097+
assert_ne!(
5098+
static_term.schema().metadata(),
5099+
recursive_term.schema().metadata()
5100+
);
5101+
5102+
let plan = LogicalPlan::RecursiveQuery(RecursiveQuery {
5103+
name: "cte".to_string(),
5104+
static_term,
5105+
recursive_term,
5106+
is_distinct: false,
5107+
});
5108+
5109+
let recomputed = plan.recompute_schema().unwrap();
5110+
5111+
let LogicalPlan::RecursiveQuery(RecursiveQuery {
5112+
static_term,
5113+
recursive_term,
5114+
..
5115+
}) = recomputed
5116+
else {
5117+
panic!("recompute_schema should preserve the recursive query shape");
5118+
};
5119+
5120+
assert_eq!(static_term.schema(), recursive_term.schema());
5121+
assert_eq!(
5122+
recursive_term.schema().metadata(),
5123+
static_term.schema().metadata()
5124+
);
5125+
}
5126+
50915127
#[test]
50925128
fn test_nullable_schema_after_grouping_set() {
50935129
let schema = Schema::new(vec![

0 commit comments

Comments
 (0)