Skip to content

Commit 739e147

Browse files
kosiewCopilot
andauthored
Add reusable plan-time schema alignment helper and apply to RecursiveQueryExec (#21912)
## Which issue does this PR close? * Closes #21910. --- ## Rationale for this change Physical plans in DataFusion can expose schemas that differ from their declared output schema, particularly when combining independently planned branches such as in recursive CTEs. This mismatch can lead to inconsistencies observed by downstream operators or consumers that rely on field names or schema metadata. Previously, schema alignment for recursive queries was handled at the `RecordBatch` level during execution, which can obscure contract violations in the physical plan and make behavior harder to audit. This change introduces a reusable execution-layer helper to align schemas at plan construction time, ensuring that child plans conform to the expected schema before execution. --- ## What changes are included in this PR? * Introduce `project_plan_to_schema` in `datafusion/physical-plan/src/common.rs`: * Returns the input plan unchanged when schemas match. * Applies a `ProjectionExec` to align field names when schemas are positionally compatible. * Validates column count, data types, nullability, and metadata before applying projection. * Produces clear errors when alignment is not possible. * Update `RecursiveQueryExec`: * Apply `project_plan_to_schema` to the recursive term during construction. * Remove batch-level schema rebinding logic from `RecursiveQueryStream`. * Adjust tests and expected plans to reflect consistent field naming: * Updated recursive CTE tests and explain output expectations. --- ## Are these changes tested? Yes. The following tests are included: * In `common.rs`: * `project_plan_to_schema_returns_input_when_schema_matches` * `project_plan_to_schema_aliases_field_names_with_projection_exec` * `project_plan_to_schema_preserves_matching_metadata_while_renaming` * `project_plan_to_schema_errors_on_column_count_mismatch` * `project_plan_to_schema_errors_on_type_mismatch` * `project_plan_to_schema_errors_on_nullability_mismatch` * `project_plan_to_schema_errors_on_field_metadata_mismatch` * `project_plan_to_schema_errors_on_schema_metadata_mismatch` * In `recursive_query.rs`: * `recursive_query_exec_projects_recursive_term_to_reconciled_schema` * `recursive_query_exec_rejects_nullability_mismatch` * Updates to existing sqllogictest cases in `cte.slt` and explain plan expectations. --- ## Are there any user-facing changes? No direct user-facing API changes are introduced. However, physical plans for recursive queries now consistently expose the declared schema at plan time, which may result in more consistent field names in explain plans and downstream consumers. --- ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested. --------- Co-authored-by: Copilot <copilot@github.com>
1 parent 2c7af17 commit 739e147

4 files changed

Lines changed: 340 additions & 27 deletions

File tree

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1019,7 +1019,7 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> {
10191019
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
10201020
DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]
10211021
CoalescePartitionsExec
1022-
ProjectionExec: expr=[id@0 + 1 as ns.id + Int64(1), level@1 + 1 as ns.level + Int64(1)]
1022+
ProjectionExec: expr=[id@0 + 1 as id, level@1 + 1 as level]
10231023
FilterExec: id@0 < 10
10241024
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
10251025
WorkTableExec: name=number_series

datafusion/physical-plan/src/common.rs

Lines changed: 251 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ use std::fs::metadata;
2222
use std::sync::Arc;
2323

2424
use super::SendableRecordBatchStream;
25+
use crate::expressions::{CastExpr, Column};
26+
use crate::projection::{ProjectionExec, ProjectionExpr};
2527
use crate::stream::RecordBatchReceiverStream;
26-
use crate::{ColumnStatistics, Statistics};
28+
use crate::{ColumnStatistics, ExecutionPlan, Statistics};
2729

2830
use arrow::array::Array;
29-
use arrow::datatypes::Schema;
31+
use arrow::datatypes::{Schema, SchemaRef};
3032
use arrow::record_batch::RecordBatch;
3133
use datafusion_common::stats::Precision;
3234
use datafusion_common::{Result, plan_err};
@@ -88,6 +90,96 @@ fn build_file_list_recurse(
8890
Ok(())
8991
}
9092

93+
/// Align `input`'s physical plan schema with `expected_schema`.
94+
///
95+
/// This helper is intended for operators that combine independently planned children but
96+
/// expose a single declared output schema. It returns `input` unchanged when schemas already
97+
/// match exactly. Otherwise, it validates that projection can safely produce the expected
98+
/// schema, then wraps `input` in a [`ProjectionExec`] that keeps columns in their existing
99+
/// positional order and aliases them to `expected_schema`'s field names.
100+
///
101+
/// [`ProjectionExec`] can rename fields. When the expected field is nullable and the input
102+
/// field is not, this helper also widens nullability with a same-type [`CastExpr`]. It rejects
103+
/// differences that projection cannot safely normalize exactly, such as data type, metadata,
104+
/// schema metadata, and nullability narrowing.
105+
pub fn project_plan_to_schema(
106+
input: Arc<dyn ExecutionPlan>,
107+
expected_schema: &SchemaRef,
108+
) -> Result<Arc<dyn ExecutionPlan>> {
109+
let input_schema = input.schema();
110+
if input_schema.as_ref() == expected_schema.as_ref() {
111+
return Ok(input);
112+
}
113+
114+
if input_schema.fields().len() != expected_schema.fields().len() {
115+
return plan_err!(
116+
"Cannot project plan to expected schema: expected {} column(s), got {}",
117+
expected_schema.fields().len(),
118+
input_schema.fields().len()
119+
);
120+
}
121+
122+
if input_schema.metadata() != expected_schema.metadata() {
123+
return plan_err!(
124+
"Cannot project plan to expected schema: schema metadata differ"
125+
);
126+
}
127+
128+
if let Some((i, input_field, expected_field, mismatch)) = input_schema
129+
.fields()
130+
.iter()
131+
.zip(expected_schema.fields().iter())
132+
.enumerate()
133+
.find_map(|(i, (input_field, expected_field))| {
134+
if input_field.data_type() != expected_field.data_type() {
135+
Some((i, input_field, expected_field, "data type"))
136+
} else if input_field.is_nullable() && !expected_field.is_nullable() {
137+
Some((i, input_field, expected_field, "nullability"))
138+
} else if input_field.metadata() != expected_field.metadata() {
139+
Some((i, input_field, expected_field, "metadata"))
140+
} else {
141+
None
142+
}
143+
})
144+
{
145+
return plan_err!(
146+
"Cannot project plan column {i} ('{}') to expected output field '{}': \
147+
field {mismatch} differs (input field: {:?}, expected field: {:?})",
148+
input_field.name(),
149+
expected_field.name(),
150+
input_field,
151+
expected_field
152+
);
153+
}
154+
155+
let projection_exprs = expected_schema
156+
.fields()
157+
.iter()
158+
.enumerate()
159+
.map(|(i, expected_field)| {
160+
let input_field = input_schema.field(i);
161+
let column = Arc::new(Column::new(input_field.name(), i));
162+
let expr = if !input_field.is_nullable() && expected_field.is_nullable() {
163+
Arc::new(CastExpr::new_with_target_field(
164+
column,
165+
Arc::clone(expected_field),
166+
None,
167+
)) as _
168+
} else {
169+
column as _
170+
};
171+
ProjectionExpr {
172+
expr,
173+
alias: expected_field.name().clone(),
174+
}
175+
})
176+
.collect::<Vec<_>>();
177+
178+
let projection = ProjectionExec::try_new(projection_exprs, input)?;
179+
debug_assert_eq!(projection.schema().as_ref(), expected_schema.as_ref());
180+
Ok(Arc::new(projection))
181+
}
182+
91183
/// If running in a tokio context spawns the execution of `stream` to a separate task
92184
/// allowing it to execute in parallel with an intermediate buffer of size `buffer`
93185
pub fn spawn_buffered(
@@ -178,10 +270,7 @@ pub fn compute_record_batch_statistics(
178270
}
179271

180272
/// Checks if the given projection is valid for the given schema.
181-
pub fn can_project(
182-
schema: &arrow::datatypes::SchemaRef,
183-
projection: Option<&[usize]>,
184-
) -> Result<()> {
273+
pub fn can_project(schema: &SchemaRef, projection: Option<&[usize]>) -> Result<()> {
185274
match projection {
186275
Some(columns) => {
187276
if columns
@@ -206,12 +295,20 @@ pub fn can_project(
206295
#[cfg(test)]
207296
mod tests {
208297
use super::*;
298+
use crate::empty::EmptyExec;
299+
use crate::projection::ProjectionExec;
300+
301+
use std::collections::HashMap;
209302

210303
use arrow::{
211304
array::{Float32Array, Float64Array, UInt64Array},
212-
datatypes::{DataType, Field},
305+
datatypes::{DataType, Field, Schema},
213306
};
214307

308+
fn empty_exec(fields: Vec<Field>) -> Arc<dyn ExecutionPlan> {
309+
Arc::new(EmptyExec::new(Arc::new(Schema::new(fields))))
310+
}
311+
215312
#[test]
216313
fn test_compute_record_batch_statistics_empty() -> Result<()> {
217314
let schema = Arc::new(Schema::new(vec![
@@ -310,4 +407,151 @@ mod tests {
310407
assert_eq!(actual, expected);
311408
Ok(())
312409
}
410+
411+
#[test]
412+
fn project_plan_to_schema_returns_input_when_schema_matches() -> Result<()> {
413+
let schema = Arc::new(Schema::new(vec![Field::new(
414+
"value",
415+
DataType::Int32,
416+
false,
417+
)]));
418+
let input: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(Arc::clone(&schema)));
419+
420+
let result = project_plan_to_schema(Arc::clone(&input), &schema)?;
421+
422+
assert!(Arc::ptr_eq(&input, &result));
423+
Ok(())
424+
}
425+
426+
#[test]
427+
fn project_plan_to_schema_aliases_field_names_with_projection_exec() -> Result<()> {
428+
let input = empty_exec(vec![
429+
Field::new("recursive_a", DataType::Int32, false),
430+
Field::new("recursive_b", DataType::Utf8, true),
431+
]);
432+
let expected_schema = Arc::new(Schema::new(vec![
433+
Field::new("a", DataType::Int32, false),
434+
Field::new("b", DataType::Utf8, true),
435+
]));
436+
437+
let result = project_plan_to_schema(Arc::clone(&input), &expected_schema)?;
438+
439+
let projection = result
440+
.downcast_ref::<ProjectionExec>()
441+
.expect("schema rename should use ProjectionExec");
442+
assert!(Arc::ptr_eq(projection.input(), &input));
443+
assert_eq!(projection.schema(), expected_schema);
444+
assert_eq!(projection.expr()[0].alias, "a");
445+
assert_eq!(projection.expr()[1].alias, "b");
446+
Ok(())
447+
}
448+
449+
#[test]
450+
fn project_plan_to_schema_preserves_matching_metadata_while_renaming() -> Result<()> {
451+
let field_metadata = HashMap::from([("key".to_string(), "value".to_string())]);
452+
let schema_metadata =
453+
HashMap::from([("schema-key".to_string(), "schema-value".to_string())]);
454+
let input_schema = Arc::new(Schema::new_with_metadata(
455+
vec![
456+
Field::new("input", DataType::Int32, false)
457+
.with_metadata(field_metadata.clone()),
458+
],
459+
schema_metadata.clone(),
460+
));
461+
let input: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(input_schema));
462+
let expected_schema = Arc::new(Schema::new_with_metadata(
463+
vec![
464+
Field::new("expected", DataType::Int32, false)
465+
.with_metadata(field_metadata),
466+
],
467+
schema_metadata,
468+
));
469+
470+
let result = project_plan_to_schema(input, &expected_schema)?;
471+
472+
assert_eq!(result.schema(), expected_schema);
473+
Ok(())
474+
}
475+
476+
#[test]
477+
fn project_plan_to_schema_errors_on_column_count_mismatch() {
478+
let input = empty_exec(vec![Field::new("a", DataType::Int32, false)]);
479+
let expected_schema = Arc::new(Schema::new(vec![
480+
Field::new("a", DataType::Int32, false),
481+
Field::new("b", DataType::Int32, false),
482+
]));
483+
484+
let err = project_plan_to_schema(input, &expected_schema).unwrap_err();
485+
assert!(err.to_string().contains("expected 2 column"));
486+
}
487+
488+
#[test]
489+
fn project_plan_to_schema_errors_on_type_mismatch() {
490+
let input = empty_exec(vec![Field::new("a", DataType::Int32, false)]);
491+
let expected_schema =
492+
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
493+
494+
let err = project_plan_to_schema(input, &expected_schema).unwrap_err();
495+
assert!(err.to_string().contains("field data type differs"));
496+
}
497+
498+
#[test]
499+
fn project_plan_to_schema_widens_nullability() -> Result<()> {
500+
let input = empty_exec(vec![Field::new("a", DataType::Int32, false)]);
501+
let expected_schema = Arc::new(Schema::new(vec![Field::new(
502+
"renamed",
503+
DataType::Int32,
504+
true,
505+
)]));
506+
507+
let result = project_plan_to_schema(input, &expected_schema)?;
508+
509+
assert_eq!(result.schema(), expected_schema);
510+
Ok(())
511+
}
512+
513+
#[test]
514+
fn project_plan_to_schema_errors_on_nullability_narrowing() {
515+
let input = empty_exec(vec![Field::new("a", DataType::Int32, true)]);
516+
let expected_schema = Arc::new(Schema::new(vec![Field::new(
517+
"renamed",
518+
DataType::Int32,
519+
false,
520+
)]));
521+
522+
let err = project_plan_to_schema(input, &expected_schema).unwrap_err();
523+
assert!(err.to_string().contains("field nullability differs"));
524+
}
525+
526+
#[test]
527+
fn project_plan_to_schema_errors_on_field_metadata_mismatch() {
528+
let input =
529+
empty_exec(vec![Field::new("a", DataType::Int32, false).with_metadata(
530+
HashMap::from([("source".to_string(), "input".to_string())]),
531+
)]);
532+
let expected_schema = Arc::new(Schema::new(vec![
533+
Field::new("renamed", DataType::Int32, false).with_metadata(HashMap::from([
534+
("source".to_string(), "expected".to_string()),
535+
])),
536+
]));
537+
538+
let err = project_plan_to_schema(input, &expected_schema).unwrap_err();
539+
assert!(err.to_string().contains("field metadata differs"));
540+
}
541+
542+
#[test]
543+
fn project_plan_to_schema_errors_on_schema_metadata_mismatch() {
544+
let input_schema = Arc::new(Schema::new_with_metadata(
545+
vec![Field::new("a", DataType::Int32, false)],
546+
HashMap::from([("source".to_string(), "input".to_string())]),
547+
));
548+
let input: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(input_schema));
549+
let expected_schema = Arc::new(Schema::new_with_metadata(
550+
vec![Field::new("renamed", DataType::Int32, false)],
551+
HashMap::from([("source".to_string(), "expected".to_string())]),
552+
));
553+
554+
let err = project_plan_to_schema(input, &expected_schema).unwrap_err();
555+
assert!(err.to_string().contains("schema metadata differ"));
556+
}
313557
}

0 commit comments

Comments
 (0)