Skip to content

Commit 3796eb7

Browse files
committed
refactor: simplify CDC writer wiring and nest CdcOptions proto message
Follow-up tidy-ups to the parquet CDC options: * Drop the chunk-size validation in the parquet writer path; the bounds are enforced downstream by parquet-rs, so the extra DataFusion-side checks (and their two unit tests) are redundant. Reference the `content_defined_chunking` fields directly instead of aliasing them. * Nest the proto `CdcOptions` message inside `ParquetOptions` (`parquet_options::CdcOptions`), since it is a parquet write option. Field tags are unchanged, so the wire format is unaffected. * Add `From` helpers in `datafusion-proto-common` for the config <-> proto `CdcOptions` mapping in both directions, replacing the inline field copies in the `ParquetOptions` conversions. The `datafusion-proto` `file_formats.rs` mapping stays inline (the orphan rule blocks a shared impl there) and only switches to the nested type path. Regenerated prost/pbjson for proto-common and proto-models.
1 parent 81e756c commit 3796eb7

8 files changed

Lines changed: 222 additions & 241 deletions

File tree

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 3 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -250,23 +250,11 @@ impl ParquetOptions {
250250
builder = builder.set_encoding(parse_encoding_string(encoding)?);
251251
}
252252
if content_defined_chunking.enabled {
253-
let cdc = content_defined_chunking;
254-
if cdc.min_chunk_size == 0 {
255-
return Err(DataFusionError::Configuration(
256-
"CDC min_chunk_size must be greater than 0".to_string(),
257-
));
258-
}
259-
if cdc.max_chunk_size <= cdc.min_chunk_size {
260-
return Err(DataFusionError::Configuration(format!(
261-
"CDC max_chunk_size ({}) must be greater than min_chunk_size ({})",
262-
cdc.max_chunk_size, cdc.min_chunk_size
263-
)));
264-
}
265253
builder = builder.set_content_defined_chunking(Some(
266254
parquet::file::properties::CdcOptions {
267-
min_chunk_size: cdc.min_chunk_size,
268-
max_chunk_size: cdc.max_chunk_size,
269-
norm_level: cdc.norm_level,
255+
min_chunk_size: content_defined_chunking.min_chunk_size,
256+
max_chunk_size: content_defined_chunking.max_chunk_size,
257+
norm_level: content_defined_chunking.norm_level,
270258
},
271259
));
272260
}
@@ -887,31 +875,6 @@ mod tests {
887875
assert_eq!(cdc.norm_level, -1);
888876
}
889877

890-
#[test]
891-
fn test_cdc_validation_zero_min_chunk_size() {
892-
let mut opts = TableParquetOptions::default();
893-
opts.global.content_defined_chunking = CdcOptions {
894-
enabled: true,
895-
min_chunk_size: 0,
896-
..CdcOptions::default()
897-
};
898-
opts.arrow_schema(&Arc::new(Schema::empty()));
899-
assert!(WriterPropertiesBuilder::try_from(&opts).is_err());
900-
}
901-
902-
#[test]
903-
fn test_cdc_validation_max_not_greater_than_min() {
904-
let mut opts = TableParquetOptions::default();
905-
opts.global.content_defined_chunking = CdcOptions {
906-
enabled: true,
907-
min_chunk_size: 512 * 1024,
908-
max_chunk_size: 256 * 1024,
909-
..CdcOptions::default()
910-
};
911-
opts.arrow_schema(&Arc::new(Schema::empty()));
912-
assert!(WriterPropertiesBuilder::try_from(&opts).is_err());
913-
}
914-
915878
#[test]
916879
fn test_bloom_filter_set_ndv_only() {
917880
// the TableParquetOptions::default, with only ndv set

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -636,13 +636,15 @@ message ParquetOptions {
636636
oneof coerce_int96_tz_opt {
637637
string coerce_int96_tz = 36;
638638
}
639-
}
640639

641-
message CdcOptions {
642-
bool enabled = 1;
643-
uint64 min_chunk_size = 2;
644-
uint64 max_chunk_size = 3;
645-
int32 norm_level = 4;
640+
// Content-defined chunking (CDC) options. Nested here as it is a parquet
641+
// write option.
642+
message CdcOptions {
643+
bool enabled = 1;
644+
uint64 min_chunk_size = 2;
645+
uint64 max_chunk_size = 3;
646+
int32 norm_level = 4;
647+
}
646648
}
647649

648650
enum JoinSide {

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,16 +1130,22 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
11301130
max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
11311131
protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
11321132
}).unwrap_or(None),
1133-
content_defined_chunking: value.content_defined_chunking.map(|cdc| CdcOptions {
1134-
enabled: cdc.enabled,
1135-
min_chunk_size: cdc.min_chunk_size as usize,
1136-
max_chunk_size: cdc.max_chunk_size as usize,
1137-
norm_level: cdc.norm_level,
1138-
}).unwrap_or_default(),
1133+
content_defined_chunking: value.content_defined_chunking.map(CdcOptions::from).unwrap_or_default(),
11391134
})
11401135
}
11411136
}
11421137

1138+
impl From<protobuf::parquet_options::CdcOptions> for CdcOptions {
1139+
fn from(value: protobuf::parquet_options::CdcOptions) -> Self {
1140+
CdcOptions {
1141+
enabled: value.enabled,
1142+
min_chunk_size: value.min_chunk_size as usize,
1143+
max_chunk_size: value.max_chunk_size as usize,
1144+
norm_level: value.norm_level,
1145+
}
1146+
}
1147+
}
1148+
11431149
impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
11441150
type Error = DataFusionError;
11451151
fn try_from(

0 commit comments

Comments
 (0)