Skip to content

Commit fd97799

Browse files
authored
Make Physical CastExpr Field-aware and unify cast semantics across physical expressions (#20814)
## Which issue does this PR close? * Part of #20164 ## Rationale for this change Physical `CastExpr` previously stored only a target `DataType`. This caused field-level semantics (name, nullability, and metadata) to be lost when casts were represented in the physical layer. In contrast, logical expressions already carry this information through `FieldRef`. This mismatch created several issues: * Physical and logical cast representations diverged in how they preserve schema semantics. * Struct casting logic behaved differently depending on whether the cast was represented as `CastExpr` or `CastColumnExpr`. * Downstream components (such as schema rewriting and ordering equivalence analysis) required additional branching and duplicated logic. Making `CastExpr` field-aware aligns the physical representation with logical semantics and enables consistent schema propagation across execution planning and expression evaluation. ## What changes are included in this PR? This PR introduces field-aware semantics to `CastExpr` and simplifies several areas that previously relied on type-only casting. Key changes include: 1. **Field-aware CastExpr** * Replace the `cast_type: DataType` field with `target_field: FieldRef`. * Add `new_with_target_field` constructor to explicitly construct field-aware casts. * Keep the existing `new(expr, DataType)` constructor as a compatibility shim that creates a canonical field. 2. **Return-field and nullability behavior** * `return_field` now returns the full `target_field`, preserving name, nullability, and metadata. * `nullable()` now derives its result from the resolved target field rather than the input expression. * Add compatibility logic for legacy type-only casts to preserve previous behavior. 3. **Struct cast validation improvements** * Struct-to-struct casting now validates compatibility using field information before execution. * Planning-time validation prevents unsupported casts from reaching execution. 4. **Shared cast property logic** * Introduce shared helper functions (`cast_expr_properties`, `is_order_preserving_cast_family`) for determining ordering preservation. * Reuse this logic in both `CastExpr` and `CastColumnExpr` to avoid duplicated implementations. 5. **Schema rewriter improvements** * Refactor physical column resolution into `resolve_physical_column`. * Simplify cast insertion logic when logical and physical fields differ. * Pass explicit physical and logical fields to cast creation for improved correctness. 6. **Ordering equivalence simplification** * Introduce `substitute_cast_like_ordering` helper to unify handling of `CastExpr` and `CastColumnExpr` in ordering equivalence analysis. 7. **Additional unit tests** * Validate metadata propagation through `return_field`. * Verify nullability behavior for field-aware casts. * Ensure legacy type-only casts preserve existing semantics. * Test struct-cast validation with nested field semantics. ## Are these changes tested? Yes. New unit tests were added in `physical-expr/src/expressions/cast.rs` to verify: * Metadata propagation through field-aware casts * Correct nullability behavior derived from the target field * Backward compatibility with legacy type-only constructors * Struct cast compatibility validation using nested fields Existing tests continue to pass and validate compatibility with the previous API behavior. ## Are there any user-facing changes? There are no direct user-facing behavior changes. This change primarily improves internal schema semantics and consistency in the physical expression layer. Existing APIs remain compatible through the legacy constructor that accepts only a `DataType`. ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.
1 parent 84a22ea commit fd97799

File tree

4 files changed

+319
-144
lines changed

4 files changed

+319
-144
lines changed

datafusion/physical-expr-adapter/src/schema_rewriter.rs

Lines changed: 61 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::sync::Arc;
2626

2727
use arrow::array::RecordBatch;
2828
use arrow::compute::can_cast_types;
29-
use arrow::datatypes::{DataType, Field, SchemaRef};
29+
use arrow::datatypes::{DataType, Field, FieldRef, SchemaRef};
3030
use datafusion_common::{
3131
Result, ScalarValue, exec_err,
3232
metadata::FieldMetadata,
@@ -404,71 +404,76 @@ impl DefaultPhysicalExprAdapterRewriter {
404404
}
405405
};
406406

407-
// Check if the column exists in the physical schema
408-
let physical_column_index = match self
409-
.physical_file_schema
410-
.index_of(column.name())
411-
{
412-
Ok(index) => index,
413-
Err(_) => {
414-
if !logical_field.is_nullable() {
415-
return exec_err!(
416-
"Non-nullable column '{}' is missing from the physical schema",
417-
column.name()
418-
);
419-
}
420-
// If the column is missing from the physical schema fill it in with nulls.
421-
// For a different behavior, provide a custom `PhysicalExprAdapter` implementation.
422-
let null_value = ScalarValue::Null.cast_to(logical_field.data_type())?;
423-
return Ok(Transformed::yes(Arc::new(
424-
expressions::Literal::new_with_metadata(
425-
null_value,
426-
Some(FieldMetadata::from(logical_field)),
427-
),
428-
)));
407+
let Some((resolved_column, physical_field)) =
408+
self.resolve_physical_column(column)?
409+
else {
410+
if !logical_field.is_nullable() {
411+
return exec_err!(
412+
"Non-nullable column '{}' is missing from the physical schema",
413+
column.name()
414+
);
429415
}
416+
// If the column is missing from the physical schema fill it in with nulls.
417+
// For a different behavior, provide a custom `PhysicalExprAdapter` implementation.
418+
let null_value = ScalarValue::Null.cast_to(logical_field.data_type())?;
419+
return Ok(Transformed::yes(Arc::new(
420+
expressions::Literal::new_with_metadata(
421+
null_value,
422+
Some(FieldMetadata::from(logical_field)),
423+
),
424+
)));
430425
};
431-
let physical_field = self.physical_file_schema.field(physical_column_index);
432426

433-
if column.index() == physical_column_index && logical_field == physical_field {
427+
if resolved_column.index() == column.index()
428+
&& logical_field == physical_field.as_ref()
429+
{
434430
return Ok(Transformed::no(expr));
435431
}
436432

437-
let column = self.resolve_column(column, physical_column_index)?;
438-
439-
if logical_field == physical_field {
433+
if logical_field == physical_field.as_ref() {
440434
// If the fields match (including metadata/nullability), we can use the column as is
441-
return Ok(Transformed::yes(Arc::new(column)));
435+
return Ok(Transformed::yes(Arc::new(resolved_column)));
442436
}
443437

444-
if logical_field.data_type() == physical_field.data_type() {
445-
// The data type matches, but the field metadata / nullability differs.
446-
// Emit a CastColumnExpr so downstream schema construction uses the logical field.
447-
return self.create_cast_column_expr(column, logical_field);
448-
}
449-
450-
// We need to cast the column to the logical data type
438+
// We need a cast expression whenever the logical and physical fields differ,
439+
// whether that difference is only metadata/nullability or also data type.
451440
// TODO: add optimization to move the cast from the column to literal expressions in the case of `col = 123`
452441
// since that's much cheaper to evalaute.
453442
// See https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928
454-
self.create_cast_column_expr(column, logical_field)
443+
self.create_cast_column_expr(resolved_column, physical_field, logical_field)
455444
}
456445

457-
/// Resolves a column expression, handling index and type mismatches.
458-
///
459-
/// Returns the appropriate Column expression when the column's index or data type
460-
/// don't match the physical schema. Assumes that the early-exit case (both index
461-
/// and type match) has already been checked by the caller.
462-
fn resolve_column(
446+
/// Resolves a logical column to the corresponding physical column and field.
447+
fn resolve_physical_column(
463448
&self,
464449
column: &Column,
465-
physical_column_index: usize,
466-
) -> Result<Column> {
467-
if column.index() == physical_column_index {
468-
Ok(column.clone())
450+
) -> Result<Option<(Column, FieldRef)>> {
451+
// The physical schema adaptation step intentionally resolves columns by **name first**
452+
// rather than trusting the incoming index. This mirrors what the old refactoring
453+
// did before `resolve_physical_column()` was extracted: the planner might hand us a
454+
// `Column` whose `index` field is stale (e.g. after projection/rename rewrites), so
455+
// resolving by name ensures we match the correct physical slot. Once we know the
456+
// proper index we rebuild the `Column` with `new_with_schema` so callers can rely
457+
// on `column.index()` later without having to re-query the schema.
458+
let Ok(physical_column_index) = self.physical_file_schema.index_of(column.name())
459+
else {
460+
return Ok(None);
461+
};
462+
463+
let column = if column.index() == physical_column_index {
464+
column.clone()
469465
} else {
470-
Column::new_with_schema(column.name(), self.physical_file_schema.as_ref())
471-
}
466+
Column::new_with_schema(column.name(), self.physical_file_schema.as_ref())?
467+
};
468+
469+
Ok(Some((
470+
column,
471+
Arc::new(
472+
self.physical_file_schema
473+
.field(physical_column_index)
474+
.clone(),
475+
),
476+
)))
472477
}
473478

474479
/// Validates type compatibility and creates a CastColumnExpr if needed.
@@ -479,35 +484,29 @@ impl DefaultPhysicalExprAdapterRewriter {
479484
fn create_cast_column_expr(
480485
&self,
481486
column: Column,
487+
physical_field: FieldRef,
482488
logical_field: &Field,
483489
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
484-
// Look up the column index in the physical schema by name to ensure correctness.
485-
let physical_column_index = self.physical_file_schema.index_of(column.name())?;
486-
let actual_physical_field =
487-
self.physical_file_schema.field(physical_column_index);
488-
489490
// For struct types, use validate_struct_compatibility which handles:
490491
// - Missing fields in source (filled with nulls)
491492
// - Extra fields in source (ignored)
492493
// - Recursive validation of nested structs
493494
// For non-struct types, use Arrow's can_cast_types
494-
match (actual_physical_field.data_type(), logical_field.data_type()) {
495+
match (physical_field.data_type(), logical_field.data_type()) {
495496
(DataType::Struct(physical_fields), DataType::Struct(logical_fields)) => {
496497
validate_struct_compatibility(
497498
physical_fields.as_ref(),
498499
logical_fields.as_ref(),
499500
)?;
500501
}
501502
_ => {
502-
let is_compatible = can_cast_types(
503-
actual_physical_field.data_type(),
504-
logical_field.data_type(),
505-
);
503+
let is_compatible =
504+
can_cast_types(physical_field.data_type(), logical_field.data_type());
506505
if !is_compatible {
507506
return exec_err!(
508507
"Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)",
509508
column.name(),
510-
actual_physical_field.data_type(),
509+
physical_field.data_type(),
511510
logical_field.data_type()
512511
);
513512
}
@@ -516,7 +515,7 @@ impl DefaultPhysicalExprAdapterRewriter {
516515

517516
let cast_expr = Arc::new(CastColumnExpr::new(
518517
Arc::new(column),
519-
Arc::new(actual_physical_field.clone()),
518+
physical_field,
520519
Arc::new(logical_field.clone()),
521520
None,
522521
));
@@ -1604,6 +1603,7 @@ mod tests {
16041603
let transformed = rewriter
16051604
.create_cast_column_expr(
16061605
Column::new("a", 0),
1606+
Arc::new(physical_schema.field_with_name("a").unwrap().clone()),
16071607
logical_schema.field_with_name("a").unwrap(),
16081608
)
16091609
.unwrap();

datafusion/physical-expr/src/equivalence/properties/mod.rs

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::{
3939
PhysicalSortRequirement,
4040
};
4141

42-
use arrow::datatypes::SchemaRef;
42+
use arrow::datatypes::{DataType, SchemaRef};
4343
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
4444
use datafusion_common::{Constraint, Constraints, HashMap, Result, plan_err};
4545
use datafusion_expr::interval_arithmetic::Interval;
@@ -195,6 +195,39 @@ impl OrderingEquivalenceCache {
195195
}
196196

197197
impl EquivalenceProperties {
198+
/// Helper used by the ordering equivalence rule when considering whether a
199+
/// cast-bearing expression can replace an existing sort key without invalidating
200+
/// the ordering.
201+
///
202+
/// This function handles *both* `CastExpr` (generic cast) and
203+
/// `CastColumnExpr` (field-aware cast) because the planner may introduce either
204+
/// form during rewrite steps; the core logic is the same in both cases. The
205+
/// substitution is only allowed when the cast wraps **the very same child
206+
/// expression** that the original sort used (an exact-child-match invariant),
207+
/// and the casted type must be a widening/order-preserving conversion
208+
/// `CastExpr::check_bigger_cast(...)` ensures. Without those restrictions the
209+
/// existing sort order could be violated (e.g. a narrowing cast could collapse
210+
/// distinct values together).
211+
fn substitute_cast_like_ordering(
212+
r_expr: Arc<dyn PhysicalExpr>,
213+
sort_expr: &PhysicalSortExpr,
214+
expr_type: &DataType,
215+
) -> Option<PhysicalSortExpr> {
216+
let (child_expr, cast_type) = if let Some(cast_expr) =
217+
r_expr.as_any().downcast_ref::<CastExpr>()
218+
{
219+
(cast_expr.expr(), cast_expr.cast_type())
220+
} else if let Some(cast_expr) = r_expr.as_any().downcast_ref::<CastColumnExpr>() {
221+
(cast_expr.expr(), cast_expr.target_field().data_type())
222+
} else {
223+
return None;
224+
};
225+
226+
(child_expr.eq(&sort_expr.expr)
227+
&& CastExpr::check_bigger_cast(cast_type, expr_type))
228+
.then(|| PhysicalSortExpr::new(r_expr, sort_expr.options))
229+
}
230+
198231
/// Creates an empty `EquivalenceProperties` object.
199232
pub fn new(schema: SchemaRef) -> Self {
200233
Self {
@@ -844,32 +877,10 @@ impl EquivalenceProperties {
844877
let expr_type = sort_expr.expr.data_type(schema).unwrap();
845878
// TODO: Add one-to-one analysis for ScalarFunctions.
846879
for r_expr in referring_exprs {
847-
// We check whether this expression is substitutable.
848-
if let Some(cast_expr) =
849-
r_expr.as_any().downcast_ref::<CastExpr>()
850-
{
851-
// For casts, we need to know whether the cast
852-
// expression matches:
853-
if cast_expr.expr.eq(&sort_expr.expr)
854-
&& cast_expr.is_bigger_cast(&expr_type)
855-
{
856-
result.push(PhysicalSortExpr::new(
857-
r_expr,
858-
sort_expr.options,
859-
));
860-
}
861-
} else if let Some(cast_expr) =
862-
r_expr.as_any().downcast_ref::<CastColumnExpr>()
863-
{
864-
let cast_type = cast_expr.target_field().data_type();
865-
if cast_expr.expr().eq(&sort_expr.expr)
866-
&& CastExpr::check_bigger_cast(cast_type, &expr_type)
867-
{
868-
result.push(PhysicalSortExpr::new(
869-
r_expr,
870-
sort_expr.options,
871-
));
872-
}
880+
if let Some(substituted) = Self::substitute_cast_like_ordering(
881+
r_expr, &sort_expr, &expr_type,
882+
) {
883+
result.push(substituted);
873884
}
874885
}
875886
result.push(sort_expr);

0 commit comments

Comments
 (0)