Skip to content

Commit e11d8fa

Browse files
committed
feat: improve query planning and schema handling
- Skip identity projection when schema is already the same in builder.rs. - Replace split temporary binding with Option<DFSchema> in physical_planner.rs. - Reuse nullability booleans in recursive_schema.rs for efficiency. - Simplify formatting in matches! within cte.rs. - Add repeated schema helper to physical_planner.rs tests. - Invert branch in plan.rs to reduce clutter in recursive query rebuild path.
1 parent ec1c4ca commit e11d8fa

5 files changed

Lines changed: 46 additions & 39 deletions

File tree

datafusion/common/src/recursive_schema.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,16 @@ fn widen_dfschema_nullability_with_fields<'a>(
120120
return Ok(None);
121121
}
122122

123-
widened_nullability |= !base_field.is_nullable() && widening_field.is_nullable();
123+
let base_nullable = base_field.is_nullable();
124+
let widening_nullable = widening_field.is_nullable();
125+
let output_nullable = base_nullable || widening_nullable;
126+
widened_nullability |= !base_nullable && widening_nullable;
124127
fields.push((
125128
qualifier.cloned(),
126129
base_field
127130
.as_ref()
128131
.clone()
129-
.with_nullable(base_field.is_nullable() || widening_field.is_nullable())
132+
.with_nullable(output_nullable)
130133
.into(),
131134
));
132135
}

datafusion/core/src/physical_planner.rs

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -998,22 +998,22 @@ impl DefaultPhysicalPlanner {
998998
let input_exec = children.one()?;
999999
let physical_input_schema = input_exec.schema();
10001000
let logical_input_schema = input.as_ref().schema();
1001-
let reconciled_logical_schema;
1002-
let logical_input_schema = if schema_satisfied_by(
1003-
logical_input_schema.inner(),
1004-
&physical_input_schema,
1005-
) || !contains_recursive_query_input(input)
1006-
{
1007-
logical_input_schema
1008-
} else if let Some(schema) = reconcile_dfschema_with_schema_nullability(
1009-
logical_input_schema,
1010-
&physical_input_schema,
1011-
)? {
1012-
reconciled_logical_schema = schema;
1013-
&reconciled_logical_schema
1001+
let needs_recursive_reconciliation =
1002+
!schema_satisfied_by(
1003+
logical_input_schema.inner(),
1004+
&physical_input_schema,
1005+
) && contains_recursive_query_input(input);
1006+
let reconciled_logical_schema = if needs_recursive_reconciliation {
1007+
reconcile_dfschema_with_schema_nullability(
1008+
logical_input_schema,
1009+
&physical_input_schema,
1010+
)?
10141011
} else {
1015-
logical_input_schema
1012+
None
10161013
};
1014+
let logical_input_schema = reconciled_logical_schema
1015+
.as_ref()
1016+
.unwrap_or(logical_input_schema);
10171017
let physical_input_schema_from_logical = logical_input_schema.inner();
10181018

10191019
if !options.execution.skip_physical_aggregate_schema_check
@@ -4762,6 +4762,10 @@ digraph {
47624762
}
47634763
}
47644764

4765+
fn c1_i32_required_schema() -> SchemaRef {
4766+
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]))
4767+
}
4768+
47654769
/// Attempts to plan a query with potentially mismatched schemas.
47664770
async fn plan_with_schemas(
47674771
logical_schema: SchemaRef,
@@ -4783,8 +4787,7 @@ digraph {
47834787
// It then panics on unimplemented error in NoOpExecutionPlan.
47844788
#[should_panic(expected = "NoOpExecutionPlan")]
47854789
async fn test_aggregate_schema_check_passes() {
4786-
let schema =
4787-
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
4790+
let schema = c1_i32_required_schema();
47884791

47894792
plan_with_schemas(
47904793
Arc::clone(&schema),
@@ -4797,8 +4800,7 @@ digraph {
47974800

47984801
#[tokio::test]
47994802
async fn test_aggregate_schema_mismatch_metadata() {
4800-
let logical_schema =
4801-
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
4803+
let logical_schema = c1_i32_required_schema();
48024804
let physical_schema = Arc::new(
48034805
Schema::new(vec![Field::new("c1", DataType::Int32, false)])
48044806
.with_metadata(HashMap::from([("key".into(), "value".into())])),
@@ -4817,8 +4819,7 @@ digraph {
48174819

48184820
#[tokio::test]
48194821
async fn test_aggregate_schema_mismatch_field_count() {
4820-
let logical_schema =
4821-
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
4822+
let logical_schema = c1_i32_required_schema();
48224823
let physical_schema = Arc::new(Schema::new(vec![
48234824
Field::new("c1", DataType::Int32, false),
48244825
Field::new("c2", DataType::Int32, false),
@@ -4837,8 +4838,7 @@ digraph {
48374838

48384839
#[tokio::test]
48394840
async fn test_aggregate_schema_mismatch_field_name() {
4840-
let logical_schema =
4841-
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
4841+
let logical_schema = c1_i32_required_schema();
48424842
let physical_schema = Arc::new(Schema::new(vec![Field::new(
48434843
"different_name",
48444844
DataType::Int32,
@@ -4858,8 +4858,7 @@ digraph {
48584858

48594859
#[tokio::test]
48604860
async fn test_aggregate_schema_mismatch_field_type() {
4861-
let logical_schema =
4862-
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
4861+
let logical_schema = c1_i32_required_schema();
48634862
let physical_schema =
48644863
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int64, false)]));
48654864

@@ -4876,8 +4875,7 @@ digraph {
48764875

48774876
#[tokio::test]
48784877
async fn test_aggregate_schema_mismatch_field_nullability() {
4879-
let logical_schema =
4880-
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
4878+
let logical_schema = c1_i32_required_schema();
48814879
let physical_schema =
48824880
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
48834881

@@ -4894,8 +4892,7 @@ digraph {
48944892

48954893
#[tokio::test]
48964894
async fn test_aggregate_schema_mismatch_field_metadata() {
4897-
let logical_schema =
4898-
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
4895+
let logical_schema = c1_i32_required_schema();
48994896
let physical_schema = Arc::new(Schema::new(vec![
49004897
Field::new("c1", DataType::Int32, false)
49014898
.with_metadata(HashMap::from([("key".into(), "value".into())])),

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ use indexmap::IndexSet;
6868
pub const UNNAMED_TABLE: &str = "?table?";
6969

7070
fn plan_with_schema(plan: LogicalPlan, schema: DFSchemaRef) -> Result<LogicalPlan> {
71+
if schema == *plan.schema() {
72+
return Ok(plan);
73+
}
74+
7175
match plan {
7276
LogicalPlan::Projection(Projection { expr, input, .. }) => {
7377
Projection::try_new_with_schema(expr, input, schema)

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -752,21 +752,21 @@ impl LogicalPlan {
752752
static_term.schema(),
753753
recursive_term.schema(),
754754
)?;
755-
if output_schema == *static_term.schema() {
756-
Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
757-
name,
758-
static_term,
759-
recursive_term,
760-
is_distinct,
761-
}))
762-
} else {
755+
if output_schema != *static_term.schema() {
763756
LogicalPlanBuilder::from(Arc::unwrap_or_clone(static_term))
764757
.to_recursive_query(
765758
name,
766759
Arc::unwrap_or_clone(recursive_term),
767760
is_distinct,
768761
)?
769762
.build()
763+
} else {
764+
Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
765+
name,
766+
static_term,
767+
recursive_term,
768+
is_distinct,
769+
}))
770770
}
771771
}
772772
LogicalPlan::Analyze(_) => Ok(self),

datafusion/sql/src/cte.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,10 @@ fn has_work_table_reference(
190190
work_table_source: &Arc<dyn TableSource>,
191191
) -> bool {
192192
plan.exists(|node| {
193-
Ok(matches!(node, LogicalPlan::TableScan(scan) if Arc::ptr_eq(&scan.source, work_table_source)))
193+
Ok(matches!(
194+
node,
195+
LogicalPlan::TableScan(scan) if Arc::ptr_eq(&scan.source, work_table_source)
196+
))
194197
})
195198
// Closure always returns Ok
196199
.unwrap()

0 commit comments

Comments
 (0)