Skip to content

Commit e1ad871

Browse files
authored
Preserve logical cast field semantics during physical lowering with field-aware CastExpr (apache#20836)
## Which issue does this PR close? * Part of apache#20164 --- ## Rationale for this change The current physical planning path for `Expr::Cast` discards logical field information (name, nullability, and metadata) by lowering casts using only the target `DataType`. This results in a loss of semantic fidelity between logical and physical plans, particularly for metadata-bearing fields and same-type casts with explicit field intent. Additionally, the planner previously rejected casts with metadata due to limitations of the type-only casting API, creating inconsistencies with other parts of the system (e.g. adapter-generated expressions). This change introduces a field-aware casting path that preserves logical intent throughout physical lowering, ensuring consistent semantics across planner and adapter outputs. --- ## What changes are included in this PR? * Introduced `cast_with_target_field` to construct `CastExpr` using full `FieldRef` semantics (name, nullability, metadata). * Refactored existing `cast_with_options` to delegate to the new field-aware helper. * Moved `is_default_target_field` to a shared helper function for reuse. * Updated planner (`planner.rs`) to use `cast_with_target_field` instead of type-only casting. * Removed metadata rejection logic during cast lowering. * Ensured same-type casts preserve explicit field semantics unless the target field is default. * Adjusted cast construction to validate compatibility before building expressions. * Exported `cast_with_target_field` for internal planner use. --- ## Are these changes tested? Yes. Added planner-focused unit tests to validate: * Preservation of target field metadata during cast lowering * Correct propagation of nullability semantics * Proper handling of same-type casts with explicit field overrides * No regression for standard type-only casts * Rejection behavior for unsupported extension type casts via `TryCast` These tests ensure both backward compatibility and correctness of the new semantics. --- ## Are there any user-facing changes? Yes, behaviorally (but not API-breaking): * Cast expressions now preserve logical field metadata and nullability in physical plans. * Previously rejected metadata-bearing casts are now supported. * Same-type casts may now produce a `CastExpr` when explicit field semantics are provided. There are no breaking changes to public APIs, but downstream consumers that relied on previous planner behavior (e.g. metadata stripping or cast elision) may observe differences. --- ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.
1 parent 8f77a3b commit e1ad871

File tree

5 files changed

+208
-70
lines changed

5 files changed

+208
-70
lines changed

datafusion/physical-expr/src/expressions/cast.rs

Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -151,14 +151,8 @@ impl CastExpr {
151151
&self.cast_options
152152
}
153153

154-
fn is_default_target_field(&self) -> bool {
155-
self.target_field.name().is_empty()
156-
&& self.target_field.is_nullable()
157-
&& self.target_field.metadata().is_empty()
158-
}
159-
160154
fn resolved_target_field(&self, input_schema: &Schema) -> Result<FieldRef> {
161-
if self.is_default_target_field() {
155+
if is_default_target_field(&self.target_field) {
162156
self.expr.return_field(input_schema).map(|field| {
163157
Arc::new(
164158
field
@@ -201,6 +195,12 @@ impl CastExpr {
201195
}
202196
}
203197

198+
fn is_default_target_field(target_field: &FieldRef) -> bool {
199+
target_field.name().is_empty()
200+
&& target_field.is_nullable()
201+
&& target_field.metadata().is_empty()
202+
}
203+
204204
pub(crate) fn is_order_preserving_cast_family(
205205
source_type: &DataType,
206206
target_type: &DataType,
@@ -315,26 +315,55 @@ pub fn cast_with_options(
315315
input_schema: &Schema,
316316
cast_type: DataType,
317317
cast_options: Option<CastOptions<'static>>,
318+
) -> Result<Arc<dyn PhysicalExpr>> {
319+
cast_with_target_field(
320+
expr,
321+
input_schema,
322+
cast_type.into_nullable_field_ref(),
323+
cast_options,
324+
)
325+
}
326+
327+
/// Return a PhysicalExpression representing `expr` casted to `target_field`,
328+
/// preserving any explicit field semantics such as name, nullability, and
329+
/// metadata.
330+
///
331+
/// If the input expression already has the same data type, this helper still
332+
/// preserves an explicit `target_field` by constructing a field-aware
333+
/// [`CastExpr`]. Only the default synthesized field created by the legacy
334+
/// type-only API is elided back to the original child expression.
335+
pub fn cast_with_target_field(
336+
expr: Arc<dyn PhysicalExpr>,
337+
input_schema: &Schema,
338+
target_field: FieldRef,
339+
cast_options: Option<CastOptions<'static>>,
318340
) -> Result<Arc<dyn PhysicalExpr>> {
319341
let expr_type = expr.data_type(input_schema)?;
320-
if expr_type == cast_type {
321-
Ok(Arc::clone(&expr))
322-
} else if requires_nested_struct_cast(&expr_type, &cast_type) {
323-
if can_cast_named_struct_types(&expr_type, &cast_type) {
324-
// Allow casts involving structs (including nested inside Lists, Dictionaries,
325-
// etc.) that pass name-based compatibility validation. This validation is
326-
// applied at planning time (now) to fail fast, rather than deferring errors
327-
// to execution time. The name-based casting logic will be executed at runtime
328-
// via ColumnarValue::cast_to.
329-
Ok(Arc::new(CastExpr::new(expr, cast_type, cast_options)))
330-
} else {
331-
not_impl_err!("Unsupported CAST from {expr_type} to {cast_type}")
332-
}
333-
} else if can_cast_types(&expr_type, &cast_type) {
334-
Ok(Arc::new(CastExpr::new(expr, cast_type, cast_options)))
342+
let cast_type = target_field.data_type();
343+
if expr_type == *cast_type && is_default_target_field(&target_field) {
344+
return Ok(Arc::clone(&expr));
345+
}
346+
347+
let can_build_cast = if requires_nested_struct_cast(&expr_type, cast_type) {
348+
// Allow casts involving structs (including nested inside Lists, Dictionaries,
349+
// etc.) that pass name-based compatibility validation. This validation is
350+
// applied at planning time (now) to fail fast, rather than deferring errors
351+
// to execution time. The name-based casting logic will be executed at runtime
352+
// via ColumnarValue::cast_to.
353+
can_cast_named_struct_types(&expr_type, cast_type)
335354
} else {
336-
not_impl_err!("Unsupported CAST from {expr_type} to {cast_type}")
355+
can_cast_types(&expr_type, cast_type)
356+
};
357+
358+
if !can_build_cast {
359+
return not_impl_err!("Unsupported CAST from {expr_type} to {cast_type}");
337360
}
361+
362+
Ok(Arc::new(CastExpr::new_with_target_field(
363+
expr,
364+
target_field,
365+
cast_options,
366+
)))
338367
}
339368

340369
/// Return a PhysicalExpression representing `expr` casted to

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,5 @@ pub use no_op::NoOp;
5656
pub use not::{NotExpr, not};
5757
pub use try_cast::{TryCastExpr, try_cast};
5858
pub use unknown_column::UnKnownColumn;
59+
60+
pub(crate) use cast::cast_with_target_field;

datafusion/physical-expr/src/planner.rs

Lines changed: 75 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -288,25 +288,12 @@ pub fn create_physical_expr(
288288
};
289289
Ok(expressions::case(expr, when_then_expr, else_expr)?)
290290
}
291-
Expr::Cast(Cast { expr, field }) => {
292-
if !field.metadata().is_empty() {
293-
let (_, src_field) = expr.to_field(input_dfschema)?;
294-
return plan_err!(
295-
"Cast from {} to {} is not supported",
296-
format_type_and_metadata(
297-
src_field.data_type(),
298-
Some(src_field.metadata()),
299-
),
300-
format_type_and_metadata(field.data_type(), Some(field.metadata()))
301-
);
302-
}
303-
304-
expressions::cast(
305-
create_physical_expr(expr, input_dfschema, execution_props)?,
306-
input_schema,
307-
field.data_type().clone(),
308-
)
309-
}
291+
Expr::Cast(Cast { expr, field }) => expressions::cast_with_target_field(
292+
create_physical_expr(expr, input_dfschema, execution_props)?,
293+
input_schema,
294+
Arc::clone(field),
295+
None,
296+
),
310297
Expr::TryCast(TryCast { expr, field }) => {
311298
if !field.metadata().is_empty() {
312299
let (_, src_field) = expr.to_field(input_dfschema)?;
@@ -445,11 +432,26 @@ pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
445432
mod tests {
446433
use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
447434
use arrow::datatypes::{DataType, Field};
448-
use datafusion_common::datatype::DataTypeExt;
449435
use datafusion_expr::col;
450436

451437
use super::*;
452438

439+
fn test_cast_schema() -> Schema {
440+
Schema::new(vec![Field::new("a", DataType::Int32, false)])
441+
}
442+
443+
fn lower_cast_expr(expr: &Expr, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
444+
let df_schema = DFSchema::try_from(schema.clone())?;
445+
create_physical_expr(expr, &df_schema, &ExecutionProps::new())
446+
}
447+
448+
fn as_planner_cast(physical: &Arc<dyn PhysicalExpr>) -> &expressions::CastExpr {
449+
physical
450+
.as_any()
451+
.downcast_ref::<expressions::CastExpr>()
452+
.expect("planner should lower logical CAST to CastExpr")
453+
}
454+
453455
#[test]
454456
fn test_create_physical_expr_scalar_input_output() -> Result<()> {
455457
let expr = col("letter").eq(lit("A"));
@@ -476,36 +478,63 @@ mod tests {
476478
}
477479

478480
#[test]
479-
fn test_cast_to_extension_type() -> Result<()> {
480-
let extension_field_type = Arc::new(
481-
DataType::FixedSizeBinary(16)
482-
.into_nullable_field()
483-
.with_metadata(
484-
[("ARROW:extension:name".to_string(), "arrow.uuid".to_string())]
485-
.into(),
486-
),
481+
fn test_cast_lowering_preserves_target_field_metadata() -> Result<()> {
482+
let schema = test_cast_schema();
483+
let target_field = Arc::new(
484+
Field::new("cast_target", DataType::Int64, true)
485+
.with_metadata([("target_meta".to_string(), "1".to_string())].into()),
487486
);
488-
let expr = lit("3230e5d4-888e-408b-b09b-831f44aa0c58");
489487
let cast_expr = Expr::Cast(Cast::new_from_field(
490-
Box::new(expr.clone()),
491-
Arc::clone(&extension_field_type),
488+
Box::new(col("a")),
489+
Arc::clone(&target_field),
492490
));
493-
let err =
494-
create_physical_expr(&cast_expr, &DFSchema::empty(), &ExecutionProps::new())
495-
.unwrap_err();
496-
assert!(err.message().contains("arrow.uuid"));
497-
498-
let try_cast_expr = Expr::TryCast(TryCast::new_from_field(
499-
Box::new(expr.clone()),
500-
Arc::clone(&extension_field_type),
491+
492+
let physical = lower_cast_expr(&cast_expr, &schema)?;
493+
let cast = as_planner_cast(&physical);
494+
495+
assert_eq!(cast.target_field(), &target_field);
496+
assert_eq!(physical.return_field(&schema)?, target_field);
497+
assert!(physical.nullable(&schema)?);
498+
499+
Ok(())
500+
}
501+
502+
#[test]
503+
fn test_cast_lowering_preserves_standard_cast_semantics() -> Result<()> {
504+
let schema = test_cast_schema();
505+
let cast_expr = Expr::Cast(Cast::new(Box::new(col("a")), DataType::Int64));
506+
507+
let physical = lower_cast_expr(&cast_expr, &schema)?;
508+
let cast = as_planner_cast(&physical);
509+
let returned_field = physical.return_field(&schema)?;
510+
511+
assert_eq!(cast.cast_type(), &DataType::Int64);
512+
assert_eq!(returned_field.name(), "a");
513+
assert_eq!(returned_field.data_type(), &DataType::Int64);
514+
assert!(!physical.nullable(&schema)?);
515+
516+
Ok(())
517+
}
518+
519+
#[test]
520+
fn test_cast_lowering_preserves_same_type_field_semantics() -> Result<()> {
521+
let schema = test_cast_schema();
522+
let target_field = Arc::new(
523+
Field::new("same_type_cast", DataType::Int32, true).with_metadata(
524+
[("target_meta".to_string(), "same-type".to_string())].into(),
525+
),
526+
);
527+
let cast_expr = Expr::Cast(Cast::new_from_field(
528+
Box::new(col("a")),
529+
Arc::clone(&target_field),
501530
));
502-
let err = create_physical_expr(
503-
&try_cast_expr,
504-
&DFSchema::empty(),
505-
&ExecutionProps::new(),
506-
)
507-
.unwrap_err();
508-
assert!(err.message().contains("arrow.uuid"));
531+
532+
let physical = lower_cast_expr(&cast_expr, &schema)?;
533+
let cast = as_planner_cast(&physical);
534+
535+
assert_eq!(cast.target_field(), &target_field);
536+
assert_eq!(physical.return_field(&schema)?, target_field);
537+
assert!(physical.nullable(&schema)?);
509538

510539
Ok(())
511540
}

datafusion/sqllogictest/src/test_context.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,17 @@ use arrow::array::{
2828
LargeStringArray, StringArray, TimestampNanosecondArray, UnionArray,
2929
};
3030
use arrow::buffer::ScalarBuffer;
31-
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UnionFields};
31+
use arrow::datatypes::{
32+
DataType, Field, FieldRef, Schema, SchemaRef, TimeUnit, UnionFields,
33+
};
3234
use arrow::record_batch::RecordBatch;
3335
use datafusion::catalog::{
3436
CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider, Session,
3537
};
3638
use datafusion::common::{DataFusionError, Result, not_impl_err};
3739
use datafusion::functions::math::abs;
3840
use datafusion::logical_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl};
41+
use datafusion::logical_expr::planner::TypePlanner;
3942
use datafusion::logical_expr::{
4043
ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
4144
Volatility, create_udf,
@@ -54,6 +57,7 @@ use datafusion::common::cast::as_float64_array;
5457
use datafusion::execution::SessionStateBuilder;
5558
use datafusion::execution::runtime_env::RuntimeEnv;
5659
use log::info;
60+
use sqlparser::ast;
5761
use tempfile::TempDir;
5862

5963
/// Context for running tests
@@ -64,6 +68,23 @@ pub struct TestContext {
6468
test_dir: Option<TempDir>,
6569
}
6670

71+
#[derive(Debug)]
72+
struct SqlLogicTestTypePlanner;
73+
74+
impl TypePlanner for SqlLogicTestTypePlanner {
75+
fn plan_type_field(&self, sql_type: &ast::DataType) -> Result<Option<FieldRef>> {
76+
match sql_type {
77+
ast::DataType::Uuid => Ok(Some(Arc::new(
78+
Field::new("", DataType::FixedSizeBinary(16), true).with_metadata(
79+
[("ARROW:extension:name".to_string(), "arrow.uuid".to_string())]
80+
.into(),
81+
),
82+
))),
83+
_ => Ok(None),
84+
}
85+
}
86+
}
87+
6788
impl TestContext {
6889
pub fn new(ctx: SessionContext) -> Self {
6990
Self {
@@ -92,6 +113,14 @@ impl TestContext {
92113
state_builder = state_builder.with_spark_features();
93114
}
94115

116+
if matches!(
117+
relative_path.file_name().and_then(|name| name.to_str()),
118+
Some("cast_extension_type_metadata.slt")
119+
) {
120+
state_builder =
121+
state_builder.with_type_planner(Arc::new(SqlLogicTestTypePlanner));
122+
}
123+
95124
let state = state_builder.build();
96125

97126
let mut test_ctx = TestContext::new(SessionContext::new_with_state(state));
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Regression tests for logical CAST targets that carry explicit field metadata.
19+
20+
query ?T
21+
SELECT
22+
CAST(
23+
arrow_cast(X'00010203040506070809000102030506', 'FixedSizeBinary(16)')
24+
AS UUID
25+
),
26+
arrow_metadata(
27+
CAST(
28+
arrow_cast(X'00010203040506070809000102030506', 'FixedSizeBinary(16)')
29+
AS UUID
30+
),
31+
'ARROW:extension:name'
32+
);
33+
----
34+
00010203040506070809000102030506 arrow.uuid
35+
36+
query ?T
37+
SELECT
38+
CAST(raw AS UUID),
39+
arrow_metadata(CAST(raw AS UUID), 'ARROW:extension:name')
40+
FROM (
41+
VALUES (
42+
arrow_cast(X'00010203040506070809000102030506', 'FixedSizeBinary(16)')
43+
)
44+
) AS uuids(raw);
45+
----
46+
00010203040506070809000102030506 arrow.uuid
47+
48+
statement error DataFusion error: Optimizer rule 'simplify_expressions' failed[\s\S]*TryCast from FixedSizeBinary\(16\) to FixedSizeBinary\(16\)<\{"ARROW:extension:name": "arrow\.uuid"\}> is not supported
49+
SELECT TRY_CAST(arrow_cast(X'00010203040506070809000102030506', 'FixedSizeBinary(16)') AS UUID);

0 commit comments

Comments
 (0)