Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
fee5c5e
feat: add `normalize_batch_schema` function for RecordBatch schema no…
kosiew Apr 29, 2026
59593bb
refactor: improve performance of normalize_batch_schema function
kosiew Apr 29, 2026
9945e7e
feat: reorder imports and clean up comments in common.rs
kosiew Apr 29, 2026
022c4a5
feat: enhance schema normalization in RecordBatch
kosiew Apr 29, 2026
4d5ff4b
refactor: clean up and organize use statements and formatting in mult…
kosiew Apr 29, 2026
fb7ac5f
fix: update documentation to reference crate path for ExecutionPlan s…
kosiew Apr 29, 2026
fded499
feat: add `normalize_batch_schema` helper for zero-copy schema normal…
kosiew Apr 29, 2026
1531abe
chore: update changelog for release 53.1.0, removing improvements sec…
kosiew Apr 29, 2026
c2ef397
feat: Implement recursive CTE schema-name alignment in ProjectionExec
kosiew Apr 30, 2026
7b8be9f
feat: add reusable project_plan_to_schema utility and update Recursiv…
kosiew Apr 30, 2026
6041c1d
fix: update CTE queries to ensure proper ID selection and level calcu…
kosiew Apr 30, 2026
3268381
Fix recursive CTE regression and update tests
kosiew Apr 30, 2026
16df3f0
feat(recursive_query): refactor construction path and update regressi…
kosiew Apr 30, 2026
6646464
feat(validation): change error type for schema-alignment validation f…
kosiew Apr 30, 2026
d5d50fc
feat: enhance mismatch detection and assertions in common.rs and recu…
kosiew Apr 30, 2026
08b5068
fix: update level calculation in recursive CTE to use SUM function
kosiew Apr 30, 2026
870e448
feat: update recursive CTE and schema alignment for nullability
kosiew May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> {
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]
CoalescePartitionsExec
ProjectionExec: expr=[id@0 + 1 as ns.id + Int64(1), level@1 + 1 as ns.level + Int64(1)]
ProjectionExec: expr=[id@0 + 1 as id, level@1 + 1 as level]
FilterExec: id@0 < 10
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
WorkTableExec: name=number_series
Expand Down
258 changes: 251 additions & 7 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ use std::fs::metadata;
use std::sync::Arc;

use super::SendableRecordBatchStream;
use crate::expressions::{CastExpr, Column};
use crate::projection::{ProjectionExec, ProjectionExpr};
use crate::stream::RecordBatchReceiverStream;
use crate::{ColumnStatistics, Statistics};
use crate::{ColumnStatistics, ExecutionPlan, Statistics};

use arrow::array::Array;
use arrow::datatypes::Schema;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::stats::Precision;
use datafusion_common::{Result, plan_err};
Expand Down Expand Up @@ -89,6 +91,96 @@ fn build_file_list_recurse(
Ok(())
}

/// Align `input`'s physical plan schema with `expected_schema`.
///
/// This helper is intended for operators that combine independently planned children but
/// expose a single declared output schema. It returns `input` unchanged when schemas already
/// match exactly. Otherwise, it validates that projection can safely produce the expected
/// schema, then wraps `input` in a [`ProjectionExec`] that keeps columns in their existing
/// positional order and aliases them to `expected_schema`'s field names.
///
/// [`ProjectionExec`] can rename fields. When the expected field is nullable and the input
/// field is not, this helper also widens nullability with a same-type [`CastExpr`]. It rejects
/// differences that projection cannot safely normalize exactly, such as data type, metadata,
/// schema metadata, and nullability narrowing.
pub fn project_plan_to_schema(
input: Arc<dyn ExecutionPlan>,
expected_schema: &SchemaRef,
) -> Result<Arc<dyn ExecutionPlan>> {
let input_schema = input.schema();
if input_schema.as_ref() == expected_schema.as_ref() {
return Ok(input);
}

if input_schema.fields().len() != expected_schema.fields().len() {
return plan_err!(
"Cannot project plan to expected schema: expected {} column(s), got {}",
expected_schema.fields().len(),
input_schema.fields().len()
);
}

if input_schema.metadata() != expected_schema.metadata() {
return plan_err!(
"Cannot project plan to expected schema: schema metadata differ"
);
}

if let Some((i, input_field, expected_field, mismatch)) = input_schema
.fields()
.iter()
.zip(expected_schema.fields().iter())
.enumerate()
.find_map(|(i, (input_field, expected_field))| {
if input_field.data_type() != expected_field.data_type() {
Some((i, input_field, expected_field, "data type"))
} else if input_field.is_nullable() && !expected_field.is_nullable() {
Some((i, input_field, expected_field, "nullability"))
} else if input_field.metadata() != expected_field.metadata() {
Some((i, input_field, expected_field, "metadata"))
} else {
None
}
})
{
return plan_err!(
"Cannot project plan column {i} ('{}') to expected output field '{}': \
field {mismatch} differs (input field: {:?}, expected field: {:?})",
input_field.name(),
expected_field.name(),
input_field,
expected_field
);
}

let projection_exprs = expected_schema
.fields()
.iter()
.enumerate()
.map(|(i, expected_field)| {
let input_field = input_schema.field(i);
let column = Arc::new(Column::new(input_field.name(), i));
let expr = if !input_field.is_nullable() && expected_field.is_nullable() {
Arc::new(CastExpr::new_with_target_field(
column,
Arc::clone(expected_field),
None,
)) as _
} else {
column as _
};
ProjectionExpr {
expr,
alias: expected_field.name().clone(),
}
})
.collect::<Vec<_>>();

let projection = ProjectionExec::try_new(projection_exprs, input)?;
debug_assert_eq!(projection.schema().as_ref(), expected_schema.as_ref());
Ok(Arc::new(projection))
}

/// If running in a tokio context spawns the execution of `stream` to a separate task
/// allowing it to execute in parallel with an intermediate buffer of size `buffer`
pub fn spawn_buffered(
Expand Down Expand Up @@ -179,10 +271,7 @@ pub fn compute_record_batch_statistics(
}

/// Checks if the given projection is valid for the given schema.
pub fn can_project(
schema: &arrow::datatypes::SchemaRef,
projection: Option<&[usize]>,
) -> Result<()> {
pub fn can_project(schema: &SchemaRef, projection: Option<&[usize]>) -> Result<()> {
match projection {
Some(columns) => {
if columns
Expand All @@ -207,12 +296,20 @@ pub fn can_project(
#[cfg(test)]
mod tests {
use super::*;
use crate::empty::EmptyExec;
use crate::projection::ProjectionExec;

use std::collections::HashMap;

use arrow::{
array::{Float32Array, Float64Array, UInt64Array},
datatypes::{DataType, Field},
datatypes::{DataType, Field, Schema},
};

fn empty_exec(fields: Vec<Field>) -> Arc<dyn ExecutionPlan> {
Arc::new(EmptyExec::new(Arc::new(Schema::new(fields))))
}

#[test]
fn test_compute_record_batch_statistics_empty() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -311,4 +408,151 @@ mod tests {
assert_eq!(actual, expected);
Ok(())
}

#[test]
fn project_plan_to_schema_returns_input_when_schema_matches() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Int32,
false,
)]));
let input: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(Arc::clone(&schema)));

let result = project_plan_to_schema(Arc::clone(&input), &schema)?;

assert!(Arc::ptr_eq(&input, &result));
Ok(())
}

#[test]
fn project_plan_to_schema_aliases_field_names_with_projection_exec() -> Result<()> {
let input = empty_exec(vec![
Field::new("recursive_a", DataType::Int32, false),
Field::new("recursive_b", DataType::Utf8, true),
]);
let expected_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, true),
]));

let result = project_plan_to_schema(Arc::clone(&input), &expected_schema)?;

let projection = result
.downcast_ref::<ProjectionExec>()
.expect("schema rename should use ProjectionExec");
assert!(Arc::ptr_eq(projection.input(), &input));
assert_eq!(projection.schema(), expected_schema);
assert_eq!(projection.expr()[0].alias, "a");
assert_eq!(projection.expr()[1].alias, "b");
Ok(())
}

#[test]
fn project_plan_to_schema_preserves_matching_metadata_while_renaming() -> Result<()> {
let field_metadata = HashMap::from([("key".to_string(), "value".to_string())]);
let schema_metadata =
HashMap::from([("schema-key".to_string(), "schema-value".to_string())]);
let input_schema = Arc::new(Schema::new_with_metadata(
vec![
Field::new("input", DataType::Int32, false)
.with_metadata(field_metadata.clone()),
],
schema_metadata.clone(),
));
let input: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(input_schema));
let expected_schema = Arc::new(Schema::new_with_metadata(
vec![
Field::new("expected", DataType::Int32, false)
.with_metadata(field_metadata),
],
schema_metadata,
));

let result = project_plan_to_schema(input, &expected_schema)?;

assert_eq!(result.schema(), expected_schema);
Ok(())
}

#[test]
fn project_plan_to_schema_errors_on_column_count_mismatch() {
let input = empty_exec(vec![Field::new("a", DataType::Int32, false)]);
let expected_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));

let err = project_plan_to_schema(input, &expected_schema).unwrap_err();
assert!(err.to_string().contains("expected 2 column"));
}

#[test]
fn project_plan_to_schema_errors_on_type_mismatch() {
let input = empty_exec(vec![Field::new("a", DataType::Int32, false)]);
let expected_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));

let err = project_plan_to_schema(input, &expected_schema).unwrap_err();
assert!(err.to_string().contains("field data type differs"));
}

#[test]
fn project_plan_to_schema_widens_nullability() -> Result<()> {
let input = empty_exec(vec![Field::new("a", DataType::Int32, false)]);
let expected_schema = Arc::new(Schema::new(vec![Field::new(
"renamed",
DataType::Int32,
true,
)]));

let result = project_plan_to_schema(input, &expected_schema)?;

assert_eq!(result.schema(), expected_schema);
Ok(())
}

#[test]
fn project_plan_to_schema_errors_on_nullability_narrowing() {
let input = empty_exec(vec![Field::new("a", DataType::Int32, true)]);
let expected_schema = Arc::new(Schema::new(vec![Field::new(
"renamed",
DataType::Int32,
false,
)]));

let err = project_plan_to_schema(input, &expected_schema).unwrap_err();
assert!(err.to_string().contains("field nullability differs"));
}

#[test]
fn project_plan_to_schema_errors_on_field_metadata_mismatch() {
let input =
empty_exec(vec![Field::new("a", DataType::Int32, false).with_metadata(
HashMap::from([("source".to_string(), "input".to_string())]),
)]);
let expected_schema = Arc::new(Schema::new(vec![
Field::new("renamed", DataType::Int32, false).with_metadata(HashMap::from([
("source".to_string(), "expected".to_string()),
])),
]));

let err = project_plan_to_schema(input, &expected_schema).unwrap_err();
assert!(err.to_string().contains("field metadata differs"));
}

#[test]
fn project_plan_to_schema_errors_on_schema_metadata_mismatch() {
let input_schema = Arc::new(Schema::new_with_metadata(
vec![Field::new("a", DataType::Int32, false)],
HashMap::from([("source".to_string(), "input".to_string())]),
));
let input: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(input_schema));
let expected_schema = Arc::new(Schema::new_with_metadata(
vec![Field::new("renamed", DataType::Int32, false)],
HashMap::from([("source".to_string(), "expected".to_string())]),
));

let err = project_plan_to_schema(input, &expected_schema).unwrap_err();
assert!(err.to_string().contains("schema metadata differ"));
}
}
Loading
Loading