Skip to content

Commit 66eef56

Browse files
committed
refactor: rename CDC options to ParquetCdcOptions and tidy mappings
Follow-up refinements to the parquet CDC options (all unreleased): * Rename the config struct `CdcOptions` -> `ParquetCdcOptions` for explicitness and consistency with the other parquet sub-option structs (`ParquetColumnOptions`, `ParquetEncryptionOptions`), and to disambiguate from the unrelated "change data capture" meaning of CDC. * Drop the chunk-size validation in the parquet writer path (parquet-rs enforces the bounds) and gate the writer call on the `enabled` flag. * Add `From` conversions for the config <-> parquet-rs and config <-> proto `CdcOptions` mappings, replacing the inline field copies. parquet-rs has no `enabled` flag, so the conversion encodes enabled <-> Option presence. * Rename the proto message to a top-level `ParquetCdcOptions` so config and proto names line up; field tags are unchanged, so the wire format is unaffected. Regenerated prost/pbjson for proto-common and proto-models.
1 parent b874f66 commit 66eef56

10 files changed

Lines changed: 256 additions & 255 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -760,11 +760,11 @@ config_namespace! {
760760
/// Options for content-defined chunking (CDC) when writing parquet files.
761761
/// Mirrors `parquet::file::properties::CdcOptions`.
762762
///
763-
/// Carried as a [`CdcOptions`] in [`ParquetOptions::content_defined_chunking`]
763+
/// Carried as a [`ParquetCdcOptions`] in [`ParquetOptions::content_defined_chunking`]
764764
/// with an explicit `enabled` flag, so it can be toggled with dotted config
765765
/// keys (`content_defined_chunking.enabled = true|false`) and the result is
766766
/// independent of the order in which the keys are set.
767-
pub struct CdcOptions {
767+
pub struct ParquetCdcOptions {
768768
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
769769
/// parquet files. When enabled, parallel writing is automatically disabled
770770
/// since the chunker state must persist across row groups.
@@ -784,20 +784,20 @@ config_namespace! {
784784
}
785785
}
786786

787-
impl CdcOptions {
787+
impl ParquetCdcOptions {
788788
/// Returns enabled CDC options with the default chunking parameters.
789789
///
790-
/// Shorthand for `CdcOptions { enabled: true, ..Default::default() }`; combine
791-
/// with struct-update syntax to override parameters, e.g.
792-
/// `CdcOptions { min_chunk_size: 4096, ..CdcOptions::enabled() }`.
790+
/// Shorthand for `ParquetCdcOptions { enabled: true, ..Default::default() }`;
791+
/// combine with struct-update syntax to override parameters, e.g.
792+
/// `ParquetCdcOptions { min_chunk_size: 4096, ..ParquetCdcOptions::enabled() }`.
793793
pub fn enabled() -> Self {
794794
Self {
795795
enabled: true,
796796
..Default::default()
797797
}
798798
}
799799

800-
/// Returns disabled CDC options (equivalent to [`CdcOptions::default`]).
800+
/// Returns disabled CDC options (equivalent to [`ParquetCdcOptions::default`]).
801801
pub fn disabled() -> Self {
802802
Self::default()
803803
}
@@ -1006,7 +1006,7 @@ config_namespace! {
10061006
/// enabled, parallel writing is automatically disabled since the chunker state
10071007
/// must persist across row groups. Mirrors
10081008
/// `parquet::file::properties::WriterProperties::content_defined_chunking`.
1009-
pub content_defined_chunking: CdcOptions, default = Default::default()
1009+
pub content_defined_chunking: ParquetCdcOptions, default = Default::default()
10101010
}
10111011
}
10121012

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 43 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121

2222
use crate::{
2323
_internal_datafusion_err, DataFusionError, Result,
24-
config::{ParquetOptions, TableParquetOptions},
24+
config::{ParquetCdcOptions, ParquetOptions, TableParquetOptions},
2525
};
2626

2727
use arrow::datatypes::Schema;
@@ -166,6 +166,42 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
166166
}
167167
}
168168

169+
/// Convert DataFusion's [`ParquetCdcOptions`] into parquet-rs's `Option<CdcOptions>`.
170+
///
171+
/// parquet-rs has no `enabled` flag; CDC is on when the option is `Some`. So a
172+
/// disabled [`ParquetCdcOptions`] maps to `None`, and an enabled one to `Some`
173+
/// with the chunking parameters.
174+
impl From<&ParquetCdcOptions> for Option<parquet::file::properties::CdcOptions> {
175+
fn from(value: &ParquetCdcOptions) -> Self {
176+
value
177+
.enabled
178+
.then_some(parquet::file::properties::CdcOptions {
179+
min_chunk_size: value.min_chunk_size,
180+
max_chunk_size: value.max_chunk_size,
181+
norm_level: value.norm_level,
182+
})
183+
}
184+
}
185+
186+
/// Convert parquet-rs's `Option<&CdcOptions>` back into DataFusion's
187+
/// [`ParquetCdcOptions`].
188+
///
189+
/// The presence of parquet-rs options means CDC was enabled, so `Some` maps to
190+
/// `enabled: true`; `None` yields the disabled default.
191+
impl From<Option<&parquet::file::properties::CdcOptions>> for ParquetCdcOptions {
192+
fn from(value: Option<&parquet::file::properties::CdcOptions>) -> Self {
193+
match value {
194+
Some(cdc) => ParquetCdcOptions {
195+
enabled: true,
196+
min_chunk_size: cdc.min_chunk_size,
197+
max_chunk_size: cdc.max_chunk_size,
198+
norm_level: cdc.norm_level,
199+
},
200+
None => ParquetCdcOptions::default(),
201+
}
202+
}
203+
}
204+
169205
impl ParquetOptions {
170206
/// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
171207
///
@@ -249,27 +285,7 @@ impl ParquetOptions {
249285
if let Some(encoding) = encoding {
250286
builder = builder.set_encoding(parse_encoding_string(encoding)?);
251287
}
252-
if content_defined_chunking.enabled {
253-
let cdc = content_defined_chunking;
254-
if cdc.min_chunk_size == 0 {
255-
return Err(DataFusionError::Configuration(
256-
"CDC min_chunk_size must be greater than 0".to_string(),
257-
));
258-
}
259-
if cdc.max_chunk_size <= cdc.min_chunk_size {
260-
return Err(DataFusionError::Configuration(format!(
261-
"CDC max_chunk_size ({}) must be greater than min_chunk_size ({})",
262-
cdc.max_chunk_size, cdc.min_chunk_size
263-
)));
264-
}
265-
builder = builder.set_content_defined_chunking(Some(
266-
parquet::file::properties::CdcOptions {
267-
min_chunk_size: cdc.min_chunk_size,
268-
max_chunk_size: cdc.max_chunk_size,
269-
norm_level: cdc.norm_level,
270-
},
271-
));
272-
}
288+
builder = builder.set_content_defined_chunking(content_defined_chunking.into());
273289

274290
Ok(builder)
275291
}
@@ -412,7 +428,7 @@ mod tests {
412428
#[cfg(feature = "parquet_encryption")]
413429
use crate::config::ConfigFileEncryptionProperties;
414430
use crate::config::{
415-
CdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions,
431+
ParquetCdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions,
416432
};
417433
use crate::parquet_config::DFParquetWriterVersion;
418434
use parquet::basic::Compression;
@@ -604,15 +620,7 @@ mod tests {
604620
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
605621
coerce_int96: None,
606622
coerce_int96_tz: None,
607-
content_defined_chunking: props
608-
.content_defined_chunking()
609-
.map(|c| CdcOptions {
610-
enabled: true,
611-
min_chunk_size: c.min_chunk_size,
612-
max_chunk_size: c.max_chunk_size,
613-
norm_level: c.norm_level,
614-
})
615-
.unwrap_or_default(),
623+
content_defined_chunking: props.content_defined_chunking().into(),
616624
},
617625
column_specific_options,
618626
key_value_metadata,
@@ -826,7 +834,7 @@ mod tests {
826834
#[test]
827835
fn test_cdc_enabled_with_custom_options() {
828836
let mut opts = TableParquetOptions::default();
829-
opts.global.content_defined_chunking = CdcOptions {
837+
opts.global.content_defined_chunking = ParquetCdcOptions {
830838
enabled: true,
831839
min_chunk_size: 128 * 1024,
832840
max_chunk_size: 512 * 1024,
@@ -854,7 +862,7 @@ mod tests {
854862
fn test_cdc_params_ignored_when_disabled() {
855863
// Parameters are customized but `enabled` is false, so CDC stays off.
856864
let mut opts = TableParquetOptions::default();
857-
opts.global.content_defined_chunking = CdcOptions {
865+
opts.global.content_defined_chunking = ParquetCdcOptions {
858866
enabled: false,
859867
min_chunk_size: 128 * 1024,
860868
max_chunk_size: 512 * 1024,
@@ -869,7 +877,7 @@ mod tests {
869877
#[test]
870878
fn test_cdc_round_trip_through_writer_props() {
871879
let mut opts = TableParquetOptions::default();
872-
opts.global.content_defined_chunking = CdcOptions {
880+
opts.global.content_defined_chunking = ParquetCdcOptions {
873881
enabled: true,
874882
min_chunk_size: 64 * 1024,
875883
max_chunk_size: 2 * 1024 * 1024,
@@ -887,31 +895,6 @@ mod tests {
887895
assert_eq!(cdc.norm_level, -1);
888896
}
889897

890-
#[test]
891-
fn test_cdc_validation_zero_min_chunk_size() {
892-
let mut opts = TableParquetOptions::default();
893-
opts.global.content_defined_chunking = CdcOptions {
894-
enabled: true,
895-
min_chunk_size: 0,
896-
..CdcOptions::default()
897-
};
898-
opts.arrow_schema(&Arc::new(Schema::empty()));
899-
assert!(WriterPropertiesBuilder::try_from(&opts).is_err());
900-
}
901-
902-
#[test]
903-
fn test_cdc_validation_max_not_greater_than_min() {
904-
let mut opts = TableParquetOptions::default();
905-
opts.global.content_defined_chunking = CdcOptions {
906-
enabled: true,
907-
min_chunk_size: 512 * 1024,
908-
max_chunk_size: 256 * 1024,
909-
..CdcOptions::default()
910-
};
911-
opts.arrow_schema(&Arc::new(Schema::empty()));
912-
assert!(WriterPropertiesBuilder::try_from(&opts).is_err());
913-
}
914-
915898
#[test]
916899
fn test_bloom_filter_set_ndv_only() {
917900
// the TableParquetOptions::default, with only ndv set

datafusion/core/tests/parquet/content_defined_chunking.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow::array::{AsArray, Int32Array, StringArray};
2525
use arrow::datatypes::{DataType, Field, Int32Type, Int64Type, Schema};
2626
use arrow::record_batch::RecordBatch;
2727
use datafusion::prelude::{ParquetReadOptions, SessionContext};
28-
use datafusion_common::config::{CdcOptions, TableParquetOptions};
28+
use datafusion_common::config::{ParquetCdcOptions, TableParquetOptions};
2929
use parquet::arrow::ArrowWriter;
3030
use parquet::arrow::arrow_reader::ArrowReaderMetadata;
3131
use parquet::file::properties::WriterProperties;
@@ -97,7 +97,7 @@ async fn cdc_data_round_trip() {
9797
let batch = make_test_batch(5000);
9898

9999
let mut opts = TableParquetOptions::default();
100-
opts.global.content_defined_chunking = CdcOptions::enabled();
100+
opts.global.content_defined_chunking = ParquetCdcOptions::enabled();
101101
let props = writer_props(&mut opts, &batch.schema());
102102

103103
let tmp = write_parquet_file(&batch, props);
@@ -145,7 +145,7 @@ async fn cdc_affects_page_boundaries() {
145145

146146
// Write WITH CDC using small chunk sizes to maximize effect
147147
let mut cdc_opts = TableParquetOptions::default();
148-
cdc_opts.global.content_defined_chunking = CdcOptions {
148+
cdc_opts.global.content_defined_chunking = ParquetCdcOptions {
149149
enabled: true,
150150
min_chunk_size: 512,
151151
max_chunk_size: 2048,

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,7 @@ message ParquetOptions {
627627
uint64 max_predicate_cache_size = 33;
628628
}
629629

630-
CdcOptions content_defined_chunking = 35;
630+
ParquetCdcOptions content_defined_chunking = 35;
631631

632632
// Optional timezone applied to INT96-coerced timestamps when `coerce_int96`
633633
// is set. When `Some`, INT96 columns coerce to
@@ -638,7 +638,8 @@ message ParquetOptions {
638638
}
639639
}
640640

641-
message CdcOptions {
641+
// Content-defined chunking (CDC) options for writing parquet files.
642+
message ParquetCdcOptions {
642643
bool enabled = 1;
643644
uint64 min_chunk_size = 2;
644645
uint64 max_chunk_size = 3;

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use datafusion_common::{
3939
DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
4040
arrow_datafusion_err,
4141
config::{
42-
CdcOptions, CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
42+
CsvOptions, JsonOptions, ParquetCdcOptions, ParquetColumnOptions, ParquetOptions,
4343
TableParquetOptions,
4444
},
4545
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
@@ -1130,16 +1130,22 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
11301130
max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
11311131
protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
11321132
}).unwrap_or(None),
1133-
content_defined_chunking: value.content_defined_chunking.map(|cdc| CdcOptions {
1134-
enabled: cdc.enabled,
1135-
min_chunk_size: cdc.min_chunk_size as usize,
1136-
max_chunk_size: cdc.max_chunk_size as usize,
1137-
norm_level: cdc.norm_level,
1138-
}).unwrap_or_default(),
1133+
content_defined_chunking: value.content_defined_chunking.map(ParquetCdcOptions::from).unwrap_or_default(),
11391134
})
11401135
}
11411136
}
11421137

1138+
impl From<protobuf::ParquetCdcOptions> for ParquetCdcOptions {
1139+
fn from(value: protobuf::ParquetCdcOptions) -> Self {
1140+
ParquetCdcOptions {
1141+
enabled: value.enabled,
1142+
min_chunk_size: value.min_chunk_size as usize,
1143+
max_chunk_size: value.max_chunk_size as usize,
1144+
norm_level: value.norm_level,
1145+
}
1146+
}
1147+
}
1148+
11431149
impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
11441150
type Error = DataFusionError;
11451151
fn try_from(
@@ -1324,7 +1330,9 @@ pub(crate) fn csv_writer_options_from_proto(
13241330

13251331
#[cfg(test)]
13261332
mod tests {
1327-
use datafusion_common::config::{CdcOptions, ParquetOptions, TableParquetOptions};
1333+
use datafusion_common::config::{
1334+
ParquetCdcOptions, ParquetOptions, TableParquetOptions,
1335+
};
13281336

13291337
fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions {
13301338
let proto: crate::protobuf_common::ParquetOptions =
@@ -1384,7 +1392,7 @@ mod tests {
13841392
#[test]
13851393
fn test_parquet_options_cdc_enabled_round_trip() {
13861394
let opts = ParquetOptions {
1387-
content_defined_chunking: CdcOptions {
1395+
content_defined_chunking: ParquetCdcOptions {
13881396
enabled: true,
13891397
min_chunk_size: 128 * 1024,
13901398
max_chunk_size: 512 * 1024,
@@ -1403,10 +1411,10 @@ mod tests {
14031411
#[test]
14041412
fn test_parquet_options_cdc_negative_norm_level_round_trip() {
14051413
let opts = ParquetOptions {
1406-
content_defined_chunking: CdcOptions {
1414+
content_defined_chunking: ParquetCdcOptions {
14071415
enabled: true,
14081416
norm_level: -3,
1409-
..CdcOptions::default()
1417+
..ParquetCdcOptions::default()
14101418
},
14111419
..ParquetOptions::default()
14121420
};
@@ -1417,7 +1425,7 @@ mod tests {
14171425
#[test]
14181426
fn test_table_parquet_options_cdc_round_trip() {
14191427
let mut opts = TableParquetOptions::default();
1420-
opts.global.content_defined_chunking = CdcOptions {
1428+
opts.global.content_defined_chunking = ParquetCdcOptions {
14211429
enabled: true,
14221430
min_chunk_size: 64 * 1024,
14231431
max_chunk_size: 2 * 1024 * 1024,

0 commit comments

Comments
 (0)