Skip to content

Commit 757a927

Browse files
thinh2nicklan
andauthored
StructType modification method and stat_transform schema boilerplate code refactor. (delta-io#1872)
## What changes are proposed in this pull request? Closes delta-io#1657 Currently, there is no Schema modification method supporting insert/remove/replace a field from the Schema. Schema modification is left to the caller to construct, which create the boilerplate code across the codebase. This PR introduced the schema modification method for inserting, removing and replacing a field from schema. In addition, the schema modification method is integrated into the stats_transform to reduce boilerplate code when creating schema. Change included: - New StructType method for schema modification : - `pub fn with_field_inserted_after(mut self, after: Option<&str>, new_field: StructField) -> Self` - `pub fn with_field_removed(mut self, name: &str) -> Self` - `pub fn with_field_replaced(mut self, name: &str, new_field: StructField)` - `pub fn with_field_inserted_after(mut self, before: Option<&str>, new_field: StructField) -> Self` - Unit-test for StructType to verify the correct behaviour of the schema modification method `schema::tests::test_with_field_` - Update the stats_transform.rs function `transform_add_schema`, `build_add_output_schema`, `add_stats_parsed_to_add_schema` ## How was this change tested? - Unit test for Schema modification method: `cargo test --package delta_kernel --lib -- schema::tests::test_with_field_` - Unit test for stat_transform `cargo test --package delta_kernel --lib -- checkpoint::stats_transform::test` --------- Co-authored-by: Nick Lanham <nicklan@users.noreply.github.com>
1 parent fc88e2d commit 757a927

2 files changed

Lines changed: 307 additions & 61 deletions

File tree

kernel/src/checkpoint/stats_transform.rs

Lines changed: 45 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ pub(crate) fn build_checkpoint_read_schema_with_stats(
121121
"stats_parsed field already exists in Add schema",
122122
));
123123
}
124-
Ok(add_stats_parsed_to_add_schema(add_struct, stats_schema))
124+
add_stats_parsed_to_add_schema(add_struct, stats_schema)
125125
})
126126
}
127127

@@ -140,7 +140,7 @@ pub(crate) fn build_checkpoint_output_schema(
140140
stats_schema: &StructType,
141141
) -> DeltaResult<SchemaRef> {
142142
transform_add_schema(base_schema, |add_struct| {
143-
Ok(build_add_output_schema(config, add_struct, stats_schema))
143+
build_add_output_schema(config, add_struct, stats_schema)
144144
})
145145
}
146146

@@ -211,75 +211,55 @@ fn transform_add_schema(
211211
};
212212

213213
let modified_add = transform_fn(add_struct)?;
214-
let fields: Vec<StructField> = base_schema
215-
.fields()
216-
.map(|field| {
217-
if field.name == ADD_NAME {
218-
StructField {
219-
name: field.name.clone(),
220-
data_type: DataType::Struct(Box::new(modified_add.clone())),
221-
nullable: field.nullable,
222-
metadata: field.metadata.clone(),
223-
}
224-
} else {
225-
field.clone()
226-
}
227-
})
228-
.collect();
229-
230-
Ok(Arc::new(StructType::new_unchecked(fields)))
214+
let new_schema = base_schema.clone().with_field_replaced(
215+
ADD_NAME,
216+
StructField {
217+
name: ADD_NAME.to_string(),
218+
data_type: DataType::Struct(Box::new(modified_add)),
219+
nullable: add_field.nullable,
220+
metadata: add_field.metadata.clone(),
221+
},
222+
)?;
223+
224+
Ok(Arc::new(new_schema))
231225
}
232226

233227
/// Adds `stats_parsed` field after `stats` in the Add action schema.
234228
fn add_stats_parsed_to_add_schema(
235229
add_schema: &StructType,
236230
stats_schema: &StructType,
237-
) -> StructType {
238-
let mut fields: Vec<StructField> = Vec::with_capacity(add_schema.num_fields() + 1);
239-
240-
for field in add_schema.fields() {
241-
fields.push(field.clone());
242-
if field.name == STATS_FIELD {
243-
// Insert stats_parsed right after stats
244-
fields.push(StructField::nullable(
245-
STATS_PARSED_FIELD,
246-
DataType::Struct(Box::new(stats_schema.clone())),
247-
));
248-
}
249-
}
250-
251-
StructType::new_unchecked(fields)
231+
) -> DeltaResult<StructType> {
232+
let new_schema = add_schema.clone();
233+
new_schema.with_field_inserted_after(
234+
Some(STATS_FIELD),
235+
StructField::nullable(
236+
STATS_PARSED_FIELD,
237+
DataType::Struct(Box::new(stats_schema.clone())),
238+
),
239+
)
252240
}
253241

254242
fn build_add_output_schema(
255243
config: &StatsTransformConfig,
256244
add_schema: &StructType,
257245
stats_schema: &StructType,
258-
) -> StructType {
259-
let capacity = add_schema.num_fields()
260-
- if config.write_stats_as_json { 0 } else { 1 } // dropping stats?
261-
+ if config.write_stats_as_struct { 1 } else { 0 }; // adding stats_parsed?
262-
let mut fields: Vec<StructField> = Vec::with_capacity(capacity);
263-
264-
for field in add_schema.fields() {
265-
if field.name == STATS_FIELD {
266-
// Include stats if writing as JSON
267-
if config.write_stats_as_json {
268-
fields.push(field.clone());
269-
}
270-
// Add stats_parsed after stats position if writing as struct
271-
if config.write_stats_as_struct {
272-
fields.push(StructField::nullable(
273-
STATS_PARSED_FIELD,
274-
DataType::Struct(Box::new(stats_schema.clone())),
275-
));
276-
}
277-
} else {
278-
fields.push(field.clone());
279-
}
246+
) -> DeltaResult<StructType> {
247+
let mut new_schema = add_schema.clone();
248+
if config.write_stats_as_struct {
249+
new_schema = new_schema.with_field_inserted_after(
250+
Some(STATS_FIELD),
251+
StructField::nullable(
252+
STATS_PARSED_FIELD,
253+
DataType::Struct(Box::new(stats_schema.clone())),
254+
),
255+
)?
280256
}
281257

282-
StructType::new_unchecked(fields)
258+
if config.write_stats_as_json {
259+
Ok(new_schema)
260+
} else {
261+
Ok(new_schema.with_field_removed(STATS_FIELD))
262+
}
283263
}
284264

285265
#[cfg(test)]
@@ -469,7 +449,8 @@ mod tests {
469449
let stats_schema =
470450
StructType::new_unchecked([StructField::nullable("numRecords", DataType::LONG)]);
471451

472-
let result = add_stats_parsed_to_add_schema(&add_schema, &stats_schema);
452+
let result = add_stats_parsed_to_add_schema(&add_schema, &stats_schema)
453+
.expect("add stats_parsed to add schema should produce a valid schema");
473454

474455
// Should have 4 fields: path, stats, stats_parsed, tags
475456
assert_eq!(result.fields().count(), 4);
@@ -492,7 +473,8 @@ mod tests {
492473

493474
let stats_schema = StructType::new_unchecked([]);
494475

495-
let result = build_add_output_schema(&config, &add_schema, &stats_schema);
476+
let result = build_add_output_schema(&config, &add_schema, &stats_schema)
477+
.expect("build add output schema should produce a valid schema");
496478

497479
// Should have path and stats, no stats_parsed
498480
let field_names: Vec<&str> = result.fields().map(|f| f.name.as_str()).collect();
@@ -514,7 +496,8 @@ mod tests {
514496
let stats_schema =
515497
StructType::new_unchecked([StructField::nullable("numRecords", DataType::LONG)]);
516498

517-
let result = build_add_output_schema(&config, &add_schema, &stats_schema);
499+
let result = build_add_output_schema(&config, &add_schema, &stats_schema)
500+
.expect("build add output schema should produce a valid schema");
518501

519502
// Should have path and stats_parsed (stats dropped)
520503
let field_names: Vec<&str> = result.fields().map(|f| f.name.as_str()).collect();
@@ -536,7 +519,8 @@ mod tests {
536519
let stats_schema =
537520
StructType::new_unchecked([StructField::nullable("numRecords", DataType::LONG)]);
538521

539-
let result = build_add_output_schema(&config, &add_schema, &stats_schema);
522+
let result = build_add_output_schema(&config, &add_schema, &stats_schema)
523+
.expect("build add output schema should produce a valid schema");
540524

541525
// Should have path, stats, and stats_parsed
542526
let field_names: Vec<&str> = result.fields().map(|f| f.name.as_str()).collect();

0 commit comments

Comments
 (0)