Skip to content

Commit 1eb5206

Browse files
authored
feat: Integrate CastColumnExpr into PhysicalExprAdapter (#20269)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #20163 ## Rationale for this change Ensure physical expression schema adaptation uses struct-aware casting so schema evolution for nested structs preserves field ordering, fills missing fields with NULLs, and propagates logical field metadata/nullability correctly. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? Wire schema adaptation to emit struct-aware cast expressions for column/field mismatches (including metadata and nullability), extend expression property propagation so optimizer logic continues to work, and add unit + integration coverage for struct projection and filtering under schema evolution. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes, added new unit tests aswell <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 4dbb449 commit 1eb5206

File tree

6 files changed

+352
-11
lines changed

6 files changed

+352
-11
lines changed

datafusion/core/tests/parquet/expr_adapter.rs

Lines changed: 144 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
use std::sync::Arc;
1919

20-
use arrow::array::{RecordBatch, record_batch};
21-
use arrow_schema::{DataType, Field, Schema, SchemaRef};
20+
use arrow::array::{
21+
Array, ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray,
22+
StructArray, record_batch,
23+
};
24+
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
2225
use bytes::{BufMut, BytesMut};
2326
use datafusion::assert_batches_eq;
2427
use datafusion::common::Result;
@@ -320,6 +323,145 @@ async fn test_physical_expr_adapter_with_non_null_defaults() {
320323
assert_batches_eq!(expected, &batches);
321324
}
322325

326+
#[tokio::test]
327+
async fn test_struct_schema_evolution_projection_and_filter() -> Result<()> {
328+
use std::collections::HashMap;
329+
330+
// Physical struct: {id: Int32, name: Utf8}
331+
let physical_struct_fields: Fields = vec![
332+
Arc::new(Field::new("id", DataType::Int32, false)),
333+
Arc::new(Field::new("name", DataType::Utf8, true)),
334+
]
335+
.into();
336+
337+
let struct_array = StructArray::new(
338+
physical_struct_fields.clone(),
339+
vec![
340+
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
341+
Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef,
342+
],
343+
None,
344+
);
345+
346+
let physical_schema = Arc::new(Schema::new(vec![Field::new(
347+
"s",
348+
DataType::Struct(physical_struct_fields),
349+
true,
350+
)]));
351+
352+
let batch =
353+
RecordBatch::try_new(Arc::clone(&physical_schema), vec![Arc::new(struct_array)])?;
354+
355+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
356+
let store_url = ObjectStoreUrl::parse("memory://").unwrap();
357+
write_parquet(batch, store.clone(), "struct_evolution.parquet").await;
358+
359+
// Logical struct: {id: Int64?, name: Utf8?, extra: Boolean?} + metadata
360+
let logical_struct_fields: Fields = vec![
361+
Arc::new(Field::new("id", DataType::Int64, true)),
362+
Arc::new(Field::new("name", DataType::Utf8, true)),
363+
Arc::new(Field::new("extra", DataType::Boolean, true).with_metadata(
364+
HashMap::from([("nested_meta".to_string(), "1".to_string())]),
365+
)),
366+
]
367+
.into();
368+
369+
let table_schema = Arc::new(Schema::new(vec![
370+
Field::new("s", DataType::Struct(logical_struct_fields), false)
371+
.with_metadata(HashMap::from([("top_meta".to_string(), "1".to_string())])),
372+
]));
373+
374+
let mut cfg = SessionConfig::new()
375+
.with_collect_statistics(false)
376+
.with_parquet_pruning(false)
377+
.with_parquet_page_index_pruning(false);
378+
cfg.options_mut().execution.parquet.pushdown_filters = true;
379+
380+
let ctx = SessionContext::new_with_config(cfg);
381+
ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
382+
383+
let listing_table_config =
384+
ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
385+
.infer_options(&ctx.state())
386+
.await
387+
.unwrap()
388+
.with_schema(table_schema.clone())
389+
.with_expr_adapter_factory(Arc::new(DefaultPhysicalExprAdapterFactory));
390+
391+
let table = ListingTable::try_new(listing_table_config).unwrap();
392+
ctx.register_table("t", Arc::new(table)).unwrap();
393+
394+
let batches = ctx
395+
.sql("SELECT s FROM t")
396+
.await
397+
.unwrap()
398+
.collect()
399+
.await
400+
.unwrap();
401+
assert_eq!(batches.len(), 1);
402+
403+
// Verify top-level metadata propagation
404+
let output_schema = batches[0].schema();
405+
let s_field = output_schema.field_with_name("s").unwrap();
406+
assert_eq!(
407+
s_field.metadata().get("top_meta").map(String::as_str),
408+
Some("1")
409+
);
410+
411+
// Verify nested struct type/field propagation + values
412+
let s_array = batches[0]
413+
.column(0)
414+
.as_any()
415+
.downcast_ref::<StructArray>()
416+
.expect("expected struct array");
417+
418+
let id_array = s_array
419+
.column_by_name("id")
420+
.expect("id column")
421+
.as_any()
422+
.downcast_ref::<Int64Array>()
423+
.expect("id should be cast to Int64");
424+
assert_eq!(id_array.values(), &[1, 2, 3]);
425+
426+
let extra_array = s_array.column_by_name("extra").expect("extra column");
427+
assert_eq!(extra_array.null_count(), 3);
428+
429+
// Verify nested field metadata propagation
430+
let extra_field = match s_field.data_type() {
431+
DataType::Struct(fields) => fields
432+
.iter()
433+
.find(|f| f.name() == "extra")
434+
.expect("extra field"),
435+
other => panic!("expected struct type for s, got {other:?}"),
436+
};
437+
assert_eq!(
438+
extra_field
439+
.metadata()
440+
.get("nested_meta")
441+
.map(String::as_str),
442+
Some("1")
443+
);
444+
445+
// Smoke test: filtering on a missing nested field evaluates correctly
446+
let filtered = ctx
447+
.sql("SELECT get_field(s, 'extra') AS extra FROM t WHERE get_field(s, 'extra') IS NULL")
448+
.await
449+
.unwrap()
450+
.collect()
451+
.await
452+
.unwrap();
453+
assert_eq!(filtered.len(), 1);
454+
assert_eq!(filtered[0].num_rows(), 3);
455+
let extra = filtered[0]
456+
.column(0)
457+
.as_any()
458+
.downcast_ref::<BooleanArray>()
459+
.expect("extra should be a boolean array");
460+
assert_eq!(extra.null_count(), 3);
461+
462+
Ok(())
463+
}
464+
323465
/// Test demonstrating that a single PhysicalExprAdapterFactory instance can be
324466
/// reused across multiple ListingTable instances.
325467
///

datafusion/physical-expr-adapter/src/schema_rewriter.rs

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use arrow::compute::can_cast_types;
2929
use arrow::datatypes::{DataType, Field, SchemaRef};
3030
use datafusion_common::{
3131
Result, ScalarValue, exec_err,
32+
metadata::FieldMetadata,
3233
nested_struct::validate_struct_compatibility,
3334
tree_node::{Transformed, TransformedResult, TreeNode},
3435
};
@@ -368,7 +369,10 @@ impl DefaultPhysicalExprAdapterRewriter {
368369
};
369370

370371
let null_value = ScalarValue::Null.cast_to(logical_struct_field.data_type())?;
371-
Ok(Some(expressions::lit(null_value)))
372+
Ok(Some(Arc::new(expressions::Literal::new_with_metadata(
373+
null_value,
374+
Some(FieldMetadata::from(logical_struct_field.as_ref())),
375+
))))
372376
}
373377

374378
fn rewrite_column(
@@ -416,24 +420,33 @@ impl DefaultPhysicalExprAdapterRewriter {
416420
// If the column is missing from the physical schema fill it in with nulls.
417421
// For a different behavior, provide a custom `PhysicalExprAdapter` implementation.
418422
let null_value = ScalarValue::Null.cast_to(logical_field.data_type())?;
419-
return Ok(Transformed::yes(expressions::lit(null_value)));
423+
return Ok(Transformed::yes(Arc::new(
424+
expressions::Literal::new_with_metadata(
425+
null_value,
426+
Some(FieldMetadata::from(logical_field)),
427+
),
428+
)));
420429
}
421430
};
422431
let physical_field = self.physical_file_schema.field(physical_column_index);
423432

424-
if column.index() == physical_column_index
425-
&& logical_field.data_type() == physical_field.data_type()
426-
{
433+
if column.index() == physical_column_index && logical_field == physical_field {
427434
return Ok(Transformed::no(expr));
428435
}
429436

430437
let column = self.resolve_column(column, physical_column_index)?;
431438

432-
if logical_field.data_type() == physical_field.data_type() {
433-
// If the data types match, we can use the column as is
439+
if logical_field == physical_field {
440+
// If the fields match (including metadata/nullability), we can use the column as is
434441
return Ok(Transformed::yes(Arc::new(column)));
435442
}
436443

444+
if logical_field.data_type() == physical_field.data_type() {
445+
// The data type matches, but the field metadata / nullability differs.
446+
// Emit a CastColumnExpr so downstream schema construction uses the logical field.
447+
return self.create_cast_column_expr(column, logical_field);
448+
}
449+
437450
// We need to cast the column to the logical data type
438451
// TODO: add optimization to move the cast from the column to literal expressions in the case of `col = 123`
439452
// since that's much cheaper to evalaute.
@@ -693,6 +706,43 @@ mod tests {
693706
assert!(result.as_any().downcast_ref::<CastColumnExpr>().is_some());
694707
}
695708

709+
#[test]
710+
fn test_rewrite_column_with_metadata_or_nullability_mismatch() -> Result<()> {
711+
use std::collections::HashMap;
712+
713+
let physical_schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
714+
let logical_schema =
715+
Schema::new(vec![Field::new("a", DataType::Int64, false).with_metadata(
716+
HashMap::from([("logical_meta".to_string(), "1".to_string())]),
717+
)]);
718+
719+
let factory = DefaultPhysicalExprAdapterFactory;
720+
let adapter = factory
721+
.create(Arc::new(logical_schema), Arc::new(physical_schema.clone()))
722+
.unwrap();
723+
724+
let result = adapter.rewrite(Arc::new(Column::new("a", 0)))?;
725+
let cast = result
726+
.as_any()
727+
.downcast_ref::<CastColumnExpr>()
728+
.expect("Expected CastColumnExpr");
729+
730+
assert_eq!(cast.target_field().data_type(), &DataType::Int64);
731+
assert!(!cast.target_field().is_nullable());
732+
assert_eq!(
733+
cast.target_field()
734+
.metadata()
735+
.get("logical_meta")
736+
.map(String::as_str),
737+
Some("1")
738+
);
739+
740+
// Ensure the expression reports the logical nullability regardless of input schema
741+
assert!(!result.nullable(physical_schema.as_ref())?);
742+
743+
Ok(())
744+
}
745+
696746
#[test]
697747
fn test_rewrite_multi_column_expr_with_type_cast() {
698748
let (physical_schema, logical_schema) = create_test_schema();
@@ -865,6 +915,41 @@ mod tests {
865915
Ok(())
866916
}
867917

918+
#[test]
919+
fn test_rewrite_missing_column_propagates_metadata() -> Result<()> {
920+
use std::collections::HashMap;
921+
922+
let physical_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
923+
let logical_schema = Schema::new(vec![
924+
Field::new("a", DataType::Int32, false),
925+
Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from([(
926+
"logical_meta".to_string(),
927+
"1".to_string(),
928+
)])),
929+
]);
930+
931+
let factory = DefaultPhysicalExprAdapterFactory;
932+
let adapter = factory
933+
.create(Arc::new(logical_schema), Arc::new(physical_schema.clone()))
934+
.unwrap();
935+
936+
let result = adapter.rewrite(Arc::new(Column::new("b", 1)))?;
937+
let literal = result
938+
.as_any()
939+
.downcast_ref::<Literal>()
940+
.expect("Expected literal expression");
941+
942+
assert_eq!(
943+
literal
944+
.return_field(physical_schema.as_ref())?
945+
.metadata()
946+
.get("logical_meta")
947+
.map(String::as_str),
948+
Some("1")
949+
);
950+
Ok(())
951+
}
952+
868953
#[test]
869954
fn test_rewrite_missing_column_non_nullable_error() {
870955
let physical_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);

datafusion/physical-expr/src/equivalence/properties/dependency.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ mod tests {
390390
convert_to_sort_reqs, create_test_params, create_test_schema, parse_sort_expr,
391391
};
392392
use crate::equivalence::{ProjectionMapping, convert_to_sort_exprs};
393-
use crate::expressions::{BinaryExpr, CastExpr, Column, col};
393+
use crate::expressions::{BinaryExpr, CastColumnExpr, CastExpr, Column, col};
394394
use crate::projection::tests::output_schema;
395395
use crate::{ConstExpr, EquivalenceProperties, ScalarFunctionExpr};
396396

@@ -1019,6 +1019,44 @@ mod tests {
10191019
Ok(())
10201020
}
10211021

1022+
#[test]
1023+
fn test_eliminate_redundant_monotonic_sorts_cast_column_expr() -> Result<()> {
1024+
let schema = Arc::new(Schema::new(vec![
1025+
Field::new("a", DataType::Date32, true),
1026+
Field::new("b", DataType::Utf8, true),
1027+
Field::new("c", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1028+
]));
1029+
let mut properties = EquivalenceProperties::new(Arc::clone(&schema));
1030+
properties.reorder(
1031+
["a", "b", "c"]
1032+
.into_iter()
1033+
.map(|c| PhysicalSortExpr::new_default(col(c, schema.as_ref()).unwrap())),
1034+
)?;
1035+
1036+
let col_a = col("a", schema.as_ref())?;
1037+
let col_b = col("b", schema.as_ref())?;
1038+
let col_c = col("c", schema.as_ref())?;
1039+
1040+
let cast_c = Arc::new(CastColumnExpr::new(
1041+
Arc::clone(&col_c),
1042+
Arc::new(Field::new(
1043+
"c",
1044+
DataType::Timestamp(TimeUnit::Nanosecond, None),
1045+
true,
1046+
)),
1047+
Arc::new(Field::new("c", DataType::Date32, true)),
1048+
None,
1049+
)) as Arc<dyn PhysicalExpr>;
1050+
1051+
properties.add_equal_conditions(cast_c, Arc::clone(&col_a))?;
1052+
properties.add_constants(std::iter::once(ConstExpr::from(col_b)))?;
1053+
1054+
let required = vec![PhysicalSortExpr::new_default(col("c", &schema)?)];
1055+
assert!(properties.ordering_satisfy(required)?);
1056+
1057+
Ok(())
1058+
}
1059+
10221060
#[test]
10231061
fn test_ordering_equivalence_with_lex_monotonic_concat() -> Result<()> {
10241062
let schema = Arc::new(Schema::new(vec![

datafusion/physical-expr/src/equivalence/properties/mod.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use self::dependency::{
3333
use crate::equivalence::{
3434
AcrossPartitions, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
3535
};
36-
use crate::expressions::{CastExpr, Column, Literal, with_new_schema};
36+
use crate::expressions::{CastColumnExpr, CastExpr, Column, Literal, with_new_schema};
3737
use crate::{
3838
ConstExpr, LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr,
3939
PhysicalSortRequirement,
@@ -858,6 +858,18 @@ impl EquivalenceProperties {
858858
sort_expr.options,
859859
));
860860
}
861+
} else if let Some(cast_expr) =
862+
r_expr.as_any().downcast_ref::<CastColumnExpr>()
863+
{
864+
let cast_type = cast_expr.target_field().data_type();
865+
if cast_expr.expr().eq(&sort_expr.expr)
866+
&& CastExpr::check_bigger_cast(cast_type, &expr_type)
867+
{
868+
result.push(PhysicalSortExpr::new(
869+
r_expr,
870+
sort_expr.options,
871+
));
872+
}
861873
}
862874
}
863875
result.push(sort_expr);

0 commit comments

Comments
 (0)