Skip to content

Commit 40bc46b

Browse files
authored
refactor: Make get_cdf_transform_expr return Option<ExpressionRef> (delta-io#1401)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md 2. Run `cargo t --all-features --all-targets` to get started testing, and run `cargo fmt`. 3. Ensure you have added or run the appropriate tests for your PR. 4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 5. Be sure to keep the PR description updated to reflect all changes. --> <!-- PR title formatting: This project uses conventional commits: https://www.conventionalcommits.org/ Each PR corresponds to a commit on the `main` branch, with the title of the PR (typically) being used for the commit message on main. In order to ensure proper formatting in the CHANGELOG please ensure your PR title adheres to the conventional commit specification. Examples: - new feature PR: "feat: new API for snapshot.update()" - bugfix PR: "fix: correctly apply DV in read-table example" --> ## What changes are proposed in this pull request? <!-- Please clarify what changes you are proposing and why the changes are needed. The purpose of this section is to outline the changes, why they are needed, and how this PR fixes the issue. If the reason for the change is already explained clearly in an issue, then it does not need to be restated here. 1. If you propose a new API or feature, clarify the use case for a new API or feature. 2. If you fix a bug, you can clarify why it is a bug. --> delta-io#1378 Make get_cdf_transform_expr return Option<ExpressionRef> to avoid creating unnecessary identity expressions when no transformation is needed for CDF scans. <!-- Uncomment this section if there are any changes affecting public APIs: ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested? <!-- Please make sure to add test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested, ideally via a reproducible test documented in the PR description. --> Existing unit tests
1 parent 2ec1462 commit 40bc46b

2 files changed

Lines changed: 80 additions & 12 deletions

File tree

kernel/src/table_changes/physical_to_logical.rs

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ pub(crate) fn scan_file_physical_schema(
7272

7373
// Get the transform expression for a CDF scan file
7474
//
75+
// Returns None when no transformation is needed (identity transform), otherwise returns Some(expr).
76+
//
7577
// Note: parse_partition_values returns null values for missing partition columns,
7678
// and CDF metadata columns (commit_timestamp, commit_version, change_type) are then
7779
// added to overwrite any conflicting values. This behavior can be made more strict by changing
@@ -81,7 +83,7 @@ pub(crate) fn get_cdf_transform_expr(
8183
scan_file: &CdfScanFile,
8284
state_info: &StateInfo,
8385
physical_schema: &StructType,
84-
) -> DeltaResult<ExpressionRef> {
86+
) -> DeltaResult<Option<ExpressionRef>> {
8587
let mut partition_values = HashMap::new();
8688

8789
// Get the transform spec from StateInfo (if present)
@@ -92,6 +94,11 @@ pub(crate) fn get_cdf_transform_expr(
9294
.map(|ts| ts.as_ref())
9395
.unwrap_or(&empty_spec);
9496

97+
// Return None for identity transforms to avoid unnecessary expression evaluation
98+
if transform_spec.is_empty() {
99+
return Ok(None);
100+
}
101+
95102
// Handle regular partition values using parse_partition_values
96103
let parsed_values = parse_partition_values(
97104
&state_info.logical_schema,
@@ -104,7 +111,9 @@ pub(crate) fn get_cdf_transform_expr(
104111
let cdf_values = get_cdf_columns(&state_info.logical_schema, scan_file)?;
105112
partition_values.extend(cdf_values);
106113

107-
get_transform_expr(transform_spec, partition_values, physical_schema)
114+
let expr = get_transform_expr(transform_spec, partition_values, physical_schema)?;
115+
116+
Ok(Some(expr))
108117
}
109118

110119
#[cfg(test)]
@@ -192,7 +201,9 @@ mod tests {
192201
let result = get_cdf_transform_expr(&scan_file, &state_info, &physical_schema);
193202
assert!(result.is_ok());
194203

195-
let expr = result.unwrap();
204+
let expr_opt = result.unwrap();
205+
assert!(expr_opt.is_some(), "Expected Some(expr) but got None");
206+
let expr = expr_opt.unwrap();
196207
let Expression::Transform(transform) = expr.as_ref() else {
197208
panic!("Expected Transform expression");
198209
};
@@ -236,7 +247,9 @@ mod tests {
236247
let result = get_cdf_transform_expr(&scan_file, &state_info, &physical_schema);
237248
assert!(result.is_ok());
238249

239-
let expr = result.unwrap();
250+
let expr_opt = result.unwrap();
251+
assert!(expr_opt.is_some(), "Expected Some(expr) but got None");
252+
let expr = expr_opt.unwrap();
240253
let Expression::Transform(transform) = expr.as_ref() else {
241254
panic!("Expected Transform expression");
242255
};
@@ -283,7 +296,9 @@ mod tests {
283296
let result = get_cdf_transform_expr(&scan_file, &state_info, &physical_schema);
284297
assert!(result.is_ok());
285298

286-
let expr = result.unwrap();
299+
let expr_opt = result.unwrap();
300+
assert!(expr_opt.is_some(), "Expected Some(expr) but got None");
301+
let expr = expr_opt.unwrap();
287302
let Expression::Transform(transform) = expr.as_ref() else {
288303
panic!("Expected Transform expression");
289304
};
@@ -340,4 +355,49 @@ mod tests {
340355
let result = scan_file_physical_schema(&remove_file, &physical_schema);
341356
assert_eq!(result.fields().len(), 2); // No change
342357
}
358+
359+
#[test]
360+
fn test_get_cdf_transform_expr_returns_none_for_identity() {
361+
// When there's no transform spec and no CDF metadata columns in the schema,
362+
// the function should return None (identity transform)
363+
let scan_file = CdfScanFile {
364+
path: "test/file.parquet".to_string(),
365+
partition_values: HashMap::new(),
366+
scan_type: CdfScanFileType::Add,
367+
commit_version: 100,
368+
commit_timestamp: 1000000000000,
369+
dv_info: DvInfo::default(),
370+
remove_dv: None,
371+
};
372+
373+
// Create a simple schema without CDF metadata columns
374+
let logical_schema = Arc::new(StructType::new_unchecked(vec![
375+
StructField::nullable("id", DataType::STRING),
376+
StructField::nullable("name", DataType::STRING),
377+
]));
378+
379+
let physical_schema = StructType::new_unchecked(vec![
380+
StructField::nullable("id", DataType::STRING),
381+
StructField::nullable("name", DataType::STRING),
382+
]);
383+
384+
// Empty transform spec - no transformations needed
385+
let transform_spec = vec![];
386+
387+
let state_info = StateInfo {
388+
logical_schema,
389+
physical_schema: physical_schema.clone().into(),
390+
physical_predicate: PhysicalPredicate::None,
391+
transform_spec: Some(Arc::new(transform_spec)),
392+
};
393+
394+
let result = get_cdf_transform_expr(&scan_file, &state_info, &physical_schema);
395+
assert!(result.is_ok());
396+
397+
let expr_opt = result.unwrap();
398+
assert!(
399+
expr_opt.is_none(),
400+
"Expected None for identity transform but got Some(expr)"
401+
);
402+
}
343403
}

kernel/src/table_changes/scan.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,11 +239,14 @@ fn read_scan_file(
239239
scan_file_physical_schema(&scan_file, state_info.physical_schema.as_ref());
240240
let transform_expr = get_cdf_transform_expr(&scan_file, state_info, physical_schema.as_ref())?;
241241

242-
let phys_to_logical_eval = engine.evaluation_handler().new_expression_evaluator(
243-
physical_schema.clone(),
244-
transform_expr,
245-
state_info.logical_schema.clone().into(),
246-
);
242+
// Only create an evaluator if transformation is needed
243+
let phys_to_logical_eval = transform_expr.map(|expr| {
244+
engine.evaluation_handler().new_expression_evaluator(
245+
physical_schema.clone(),
246+
expr,
247+
state_info.logical_schema.clone().into(),
248+
)
249+
});
247250
// Determine if the scan file was derived from a deletion vector pair
248251
let is_dv_resolved_pair = scan_file.remove_dv.is_some();
249252

@@ -261,8 +264,13 @@ fn read_scan_file(
261264

262265
let result = read_result_iter.map(move |batch| -> DeltaResult<_> {
263266
let batch = batch?;
264-
// to transform the physical data into the correct logical form
265-
let logical = phys_to_logical_eval.evaluate(batch.as_ref());
267+
// Transform the physical data into the correct logical form, or pass through unchanged
268+
let logical = if let Some(ref eval) = phys_to_logical_eval {
269+
eval.evaluate(batch.as_ref())
270+
} else {
271+
// No transformation needed - pass through the batch as-is
272+
Ok(batch)
273+
};
266274
let len = logical.as_ref().map_or(0, |res| res.len());
267275
// need to split the dv_mask. what's left in dv_mask covers this result, and rest
268276
// will cover the following results. we `take()` out of `selection_vector` to avoid

0 commit comments

Comments
 (0)