Skip to content

Commit fc88e2d

Browse files
authored
feat: Generate transform in WriteContext w.r.t column mapping (delta-io#1862)
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta-kernel-rs/pull/1862/files) to review incremental changes. - [**stack/correct-transform**](delta-io#1862) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1862/files)] - [stack/cm-partition](delta-io#1870) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1870/files/6b36ecc5022a5be295d128fe6b66192b74d39614..76691de21b81671453a8e52a1fd58b58fdcbf1fe)] - [stack/support-cm-write](delta-io#1863) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1863/files/76691de21b81671453a8e52a1fd58b58fdcbf1fe..1e938eb9de2f5f8a52943ae794984585867f31a5)] - [stack/support-CM-with-flag](delta-io#1910) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1910/files/6b36ecc5022a5be295d128fe6b66192b74d39614..7e184b835db7ac1142cb878524561847007dc297)] --------- ## What changes are proposed in this pull request? Add with_dropped_field_if_exists to Transform for optionally dropping fields that may not exist in the input, and use it in generate_logical_to_physical to drop partition columns without erroring when the input data doesn't contain them. Change `WriteContext::logical_to_physical` will now correctly rename nested struct fields to their physical names under column mapping. <!-- **Uncomment** this section if there are any changes affecting public APIs. Else, **delete** this section. ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested? Added test to verify the renaming successful and `with_dropped_field_if_exists` works as expected
1 parent cdc0151 commit fc88e2d

4 files changed

Lines changed: 324 additions & 100 deletions

File tree

kernel/src/engine/arrow_expression/evaluate_expression.rs

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,15 @@ fn evaluate_transform_expression(
199199
}
200200
}
201201

202-
// Verify that all field transforms were used
203-
if used_field_transforms != transform.field_transforms.len() {
202+
// Verify that all non-optional field transforms were used
203+
let required_count = transform
204+
.field_transforms
205+
.values()
206+
.filter(|ft| !ft.optional)
207+
.count();
208+
if used_field_transforms < required_count {
204209
return Err(Error::generic(
205-
"Some field transforms reference invalid input field names",
210+
"Some non-optional field transforms reference invalid input field names",
206211
));
207212
}
208213

@@ -987,6 +992,69 @@ mod tests {
987992
.contains("Data type is required"));
988993
}
989994

995+
#[test]
996+
fn test_drop_field_if_exists_present() {
997+
let batch = create_test_batch();
998+
let transform = Transform::new_top_level().with_dropped_field_if_exists("a");
999+
let output_schema = StructType::new_unchecked(vec![
1000+
StructField::not_null("b", DataType::INTEGER),
1001+
StructField::not_null("c", DataType::INTEGER),
1002+
]);
1003+
let expr = Expr::Transform(transform);
1004+
let result = evaluate_expression(
1005+
&expr,
1006+
&batch,
1007+
Some(&DataType::Struct(Box::new(output_schema))),
1008+
)
1009+
.unwrap();
1010+
let result = result.as_any().downcast_ref::<StructArray>().unwrap();
1011+
validate_i32_column(result, 0, &[10, 20, 30]);
1012+
validate_i32_column(result, 1, &[100, 200, 300]);
1013+
}
1014+
1015+
#[test]
1016+
fn test_drop_field_if_exists_missing() {
1017+
let batch = create_test_batch();
1018+
let transform = Transform::new_top_level().with_dropped_field_if_exists("nonexistent");
1019+
let output_schema = StructType::new_unchecked(vec![
1020+
StructField::not_null("a", DataType::INTEGER),
1021+
StructField::not_null("b", DataType::INTEGER),
1022+
StructField::not_null("c", DataType::INTEGER),
1023+
]);
1024+
let expr = Expr::Transform(transform);
1025+
let result = evaluate_expression(
1026+
&expr,
1027+
&batch,
1028+
Some(&DataType::Struct(Box::new(output_schema))),
1029+
)
1030+
.unwrap();
1031+
let result = result.as_any().downcast_ref::<StructArray>().unwrap();
1032+
validate_i32_column(result, 0, &[1, 2, 3]);
1033+
validate_i32_column(result, 1, &[10, 20, 30]);
1034+
validate_i32_column(result, 2, &[100, 200, 300]);
1035+
}
1036+
1037+
#[test]
1038+
fn test_drop_field_non_optional_missing_still_errors() {
1039+
let batch = create_test_batch();
1040+
let transform = Transform::new_top_level().with_dropped_field("nonexistent");
1041+
let output_schema = StructType::new_unchecked(vec![
1042+
StructField::not_null("a", DataType::INTEGER),
1043+
StructField::not_null("b", DataType::INTEGER),
1044+
StructField::not_null("c", DataType::INTEGER),
1045+
]);
1046+
let expr = Expr::Transform(transform);
1047+
let result = evaluate_expression(
1048+
&expr,
1049+
&batch,
1050+
Some(&DataType::Struct(Box::new(output_schema))),
1051+
);
1052+
assert!(result
1053+
.unwrap_err()
1054+
.to_string()
1055+
.contains("reference invalid input field names"));
1056+
}
1057+
9901058
#[test]
9911059
fn test_struct_expression_schema_validation() {
9921060
let batch = create_test_batch();

kernel/src/expressions/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,9 @@ pub struct FieldTransform {
336336
pub exprs: Vec<ExpressionRef>,
337337
/// If true, the output expressions replace the input field instead of following after it.
338338
pub is_replace: bool,
339+
/// If true, this transform is silently ignored when the target field does not exist in the
340+
/// input. Otherwise, a missing target field produces an error.
341+
pub optional: bool,
339342
}
340343

341344
/// A transformation that efficiently represents sparse modifications to struct schemas.
@@ -382,6 +385,14 @@ impl Transform {
382385
self
383386
}
384387

388+
/// Like [`Self::with_dropped_field`], but silently ignored if the field does not exist.
389+
pub fn with_dropped_field_if_exists(mut self, name: impl Into<String>) -> Self {
390+
let field_transform = self.field_transform(name);
391+
field_transform.is_replace = true;
392+
field_transform.optional = true;
393+
self
394+
}
395+
385396
/// Specifies an expression to replace a field with.
386397
pub fn with_replaced_field(mut self, name: impl Into<String>, expr: ExpressionRef) -> Self {
387398
let field_transform = self.field_transform(name);

0 commit comments

Comments
 (0)