Skip to content

Commit a51971b

Browse files
authored
feat: add support for parquet content defined chunking options (#21110)
## Rationale for this change - closes #21110 Expose the new Content-Defined Chunking feature from parquet-rs apache/arrow-rs#9450 ## What changes are included in this PR? New parquet writer options for enabling CDC. ## Are these changes tested? In-progress. ## Are there any user-facing changes? New config options. Depends on the 58.1 arrow-rs release.
1 parent 9ab5291 commit a51971b

File tree

15 files changed

+1053
-13
lines changed

15 files changed

+1053
-13
lines changed

datafusion/common/src/config.rs

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,137 @@ config_namespace! {
687687
}
688688
}
689689

690+
/// Options for content-defined chunking (CDC) when writing parquet files.
691+
/// See [`ParquetOptions::use_content_defined_chunking`].
692+
///
693+
/// Can be enabled with default options by setting
694+
/// `use_content_defined_chunking` to `true`, or configured with sub-fields
695+
/// like `use_content_defined_chunking.min_chunk_size`.
696+
#[derive(Debug, Clone, PartialEq)]
697+
pub struct CdcOptions {
698+
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
699+
/// until this many bytes have been accumulated. Default is 256 KiB.
700+
pub min_chunk_size: usize,
701+
702+
/// Maximum chunk size in bytes. A split is forced when the accumulated
703+
/// size exceeds this value. Default is 1 MiB.
704+
pub max_chunk_size: usize,
705+
706+
/// Normalization level. Increasing this improves deduplication ratio
707+
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
708+
pub norm_level: i32,
709+
}
710+
711+
// Note: `CdcOptions` intentionally does NOT implement `Default` so that the
712+
// blanket `impl<F: ConfigField + Default> ConfigField for Option<F>` does not
713+
// apply. This allows the specific `impl ConfigField for Option<CdcOptions>`
714+
// below to handle "true"/"false" for enabling/disabling CDC.
715+
// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`.
716+
impl CdcOptions {
717+
/// Returns a new `CdcOptions` with default values.
718+
#[expect(clippy::should_implement_trait)]
719+
pub fn default() -> Self {
720+
Self {
721+
min_chunk_size: 256 * 1024,
722+
max_chunk_size: 1024 * 1024,
723+
norm_level: 0,
724+
}
725+
}
726+
}
727+
728+
impl ConfigField for CdcOptions {
729+
fn set(&mut self, key: &str, value: &str) -> Result<()> {
730+
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
731+
match key {
732+
"min_chunk_size" => self.min_chunk_size.set(rem, value),
733+
"max_chunk_size" => self.max_chunk_size.set(rem, value),
734+
"norm_level" => self.norm_level.set(rem, value),
735+
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
736+
}
737+
}
738+
739+
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
740+
let key = format!("{key_prefix}.min_chunk_size");
741+
self.min_chunk_size.visit(v, &key, "Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB.");
742+
let key = format!("{key_prefix}.max_chunk_size");
743+
self.max_chunk_size.visit(v, &key, "Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB.");
744+
let key = format!("{key_prefix}.norm_level");
745+
self.norm_level.visit(v, &key, "Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0.");
746+
}
747+
748+
fn reset(&mut self, key: &str) -> Result<()> {
749+
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
750+
match key {
751+
"min_chunk_size" => {
752+
if rem.is_empty() {
753+
self.min_chunk_size = CdcOptions::default().min_chunk_size;
754+
Ok(())
755+
} else {
756+
self.min_chunk_size.reset(rem)
757+
}
758+
}
759+
"max_chunk_size" => {
760+
if rem.is_empty() {
761+
self.max_chunk_size = CdcOptions::default().max_chunk_size;
762+
Ok(())
763+
} else {
764+
self.max_chunk_size.reset(rem)
765+
}
766+
}
767+
"norm_level" => {
768+
if rem.is_empty() {
769+
self.norm_level = CdcOptions::default().norm_level;
770+
Ok(())
771+
} else {
772+
self.norm_level.reset(rem)
773+
}
774+
}
775+
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
776+
}
777+
}
778+
}
779+
780+
/// `ConfigField` for `Option<CdcOptions>` — allows setting the option to
781+
/// `"true"` (enable with defaults) or `"false"` (disable), in addition to
782+
/// setting individual sub-fields like `min_chunk_size`.
783+
impl ConfigField for Option<CdcOptions> {
784+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
785+
match self {
786+
Some(s) => s.visit(v, key, description),
787+
None => v.none(key, description),
788+
}
789+
}
790+
791+
fn set(&mut self, key: &str, value: &str) -> Result<()> {
792+
if key.is_empty() {
793+
match value.to_ascii_lowercase().as_str() {
794+
"true" => {
795+
*self = Some(CdcOptions::default());
796+
Ok(())
797+
}
798+
"false" => {
799+
*self = None;
800+
Ok(())
801+
}
802+
_ => _config_err!(
803+
"Expected 'true' or 'false' for use_content_defined_chunking, got '{value}'"
804+
),
805+
}
806+
} else {
807+
self.get_or_insert_with(CdcOptions::default).set(key, value)
808+
}
809+
}
810+
811+
fn reset(&mut self, key: &str) -> Result<()> {
812+
if key.is_empty() {
813+
*self = None;
814+
Ok(())
815+
} else {
816+
self.get_or_insert_with(CdcOptions::default).reset(key)
817+
}
818+
}
819+
}
820+
690821
config_namespace! {
691822
/// Options for reading and writing parquet files
692823
///
@@ -872,6 +1003,12 @@ config_namespace! {
8721003
/// writing out already in-memory data, such as from a cached
8731004
/// data frame.
8741005
pub maximum_buffered_record_batches_per_stream: usize, default = 2
1006+
1007+
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
1008+
/// parquet files. When `Some`, CDC is enabled with the given options; when `None`
1009+
/// (the default), CDC is disabled. When CDC is enabled, parallel writing is
1010+
/// automatically disabled since the chunker state must persist across row groups.
1011+
pub use_content_defined_chunking: Option<CdcOptions>, default = None
8751012
}
8761013
}
8771014

@@ -1826,6 +1963,7 @@ config_field!(usize);
18261963
config_field!(f64);
18271964
config_field!(u64);
18281965
config_field!(u32);
1966+
config_field!(i32);
18291967

18301968
impl ConfigField for u8 {
18311969
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
@@ -3579,4 +3717,77 @@ mod tests {
35793717
"Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
35803718
);
35813719
}
3720+
3721+
#[cfg(feature = "parquet")]
3722+
#[test]
3723+
fn set_cdc_option_with_boolean_true() {
3724+
use crate::config::ConfigOptions;
3725+
3726+
let mut config = ConfigOptions::default();
3727+
assert!(
3728+
config
3729+
.execution
3730+
.parquet
3731+
.use_content_defined_chunking
3732+
.is_none()
3733+
);
3734+
3735+
// Setting to "true" should enable CDC with default options
3736+
config
3737+
.set(
3738+
"datafusion.execution.parquet.use_content_defined_chunking",
3739+
"true",
3740+
)
3741+
.unwrap();
3742+
let cdc = config
3743+
.execution
3744+
.parquet
3745+
.use_content_defined_chunking
3746+
.as_ref()
3747+
.expect("CDC should be enabled");
3748+
assert_eq!(cdc.min_chunk_size, 256 * 1024);
3749+
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
3750+
assert_eq!(cdc.norm_level, 0);
3751+
3752+
// Setting to "false" should disable CDC
3753+
config
3754+
.set(
3755+
"datafusion.execution.parquet.use_content_defined_chunking",
3756+
"false",
3757+
)
3758+
.unwrap();
3759+
assert!(
3760+
config
3761+
.execution
3762+
.parquet
3763+
.use_content_defined_chunking
3764+
.is_none()
3765+
);
3766+
}
3767+
3768+
#[cfg(feature = "parquet")]
3769+
#[test]
3770+
fn set_cdc_option_with_subfields() {
3771+
use crate::config::ConfigOptions;
3772+
3773+
let mut config = ConfigOptions::default();
3774+
3775+
// Setting sub-fields should also enable CDC
3776+
config
3777+
.set(
3778+
"datafusion.execution.parquet.use_content_defined_chunking.min_chunk_size",
3779+
"1024",
3780+
)
3781+
.unwrap();
3782+
let cdc = config
3783+
.execution
3784+
.parquet
3785+
.use_content_defined_chunking
3786+
.as_ref()
3787+
.expect("CDC should be enabled");
3788+
assert_eq!(cdc.min_chunk_size, 1024);
3789+
// Other fields should be defaults
3790+
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
3791+
assert_eq!(cdc.norm_level, 0);
3792+
}
35823793
}

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
9595
global,
9696
column_specific_options,
9797
key_value_metadata,
98-
crypto: _,
98+
..
9999
} = table_parquet_options;
100100

101101
let mut builder = global.into_writer_properties_builder()?;
@@ -191,6 +191,7 @@ impl ParquetOptions {
191191
bloom_filter_on_write,
192192
bloom_filter_fpp,
193193
bloom_filter_ndv,
194+
use_content_defined_chunking,
194195

195196
// not in WriterProperties
196197
enable_page_index: _,
@@ -247,6 +248,26 @@ impl ParquetOptions {
247248
if let Some(encoding) = encoding {
248249
builder = builder.set_encoding(parse_encoding_string(encoding)?);
249250
}
251+
if let Some(cdc) = use_content_defined_chunking {
252+
if cdc.min_chunk_size == 0 {
253+
return Err(DataFusionError::Configuration(
254+
"CDC min_chunk_size must be greater than 0".to_string(),
255+
));
256+
}
257+
if cdc.max_chunk_size <= cdc.min_chunk_size {
258+
return Err(DataFusionError::Configuration(format!(
259+
"CDC max_chunk_size ({}) must be greater than min_chunk_size ({})",
260+
cdc.max_chunk_size, cdc.min_chunk_size
261+
)));
262+
}
263+
builder = builder.set_content_defined_chunking(Some(
264+
parquet::file::properties::CdcOptions {
265+
min_chunk_size: cdc.min_chunk_size,
266+
max_chunk_size: cdc.max_chunk_size,
267+
norm_level: cdc.norm_level,
268+
},
269+
));
270+
}
250271

251272
Ok(builder)
252273
}
@@ -388,7 +409,9 @@ mod tests {
388409
use super::*;
389410
#[cfg(feature = "parquet_encryption")]
390411
use crate::config::ConfigFileEncryptionProperties;
391-
use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
412+
use crate::config::{
413+
CdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions,
414+
};
392415
use crate::parquet_config::DFParquetWriterVersion;
393416
use parquet::basic::Compression;
394417
use parquet::file::properties::{
@@ -460,6 +483,7 @@ mod tests {
460483
skip_arrow_metadata: defaults.skip_arrow_metadata,
461484
coerce_int96: None,
462485
max_predicate_cache_size: defaults.max_predicate_cache_size,
486+
use_content_defined_chunking: defaults.use_content_defined_chunking.clone(),
463487
}
464488
}
465489

@@ -576,6 +600,13 @@ mod tests {
576600
binary_as_string: global_options_defaults.binary_as_string,
577601
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
578602
coerce_int96: None,
603+
use_content_defined_chunking: props.content_defined_chunking().map(|c| {
604+
CdcOptions {
605+
min_chunk_size: c.min_chunk_size,
606+
max_chunk_size: c.max_chunk_size,
607+
norm_level: c.norm_level,
608+
}
609+
}),
579610
},
580611
column_specific_options,
581612
key_value_metadata,
@@ -786,6 +817,74 @@ mod tests {
786817
);
787818
}
788819

820+
#[test]
821+
fn test_cdc_enabled_with_custom_options() {
822+
let mut opts = TableParquetOptions::default();
823+
opts.global.use_content_defined_chunking = Some(CdcOptions {
824+
min_chunk_size: 128 * 1024,
825+
max_chunk_size: 512 * 1024,
826+
norm_level: 2,
827+
});
828+
opts.arrow_schema(&Arc::new(Schema::empty()));
829+
830+
let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
831+
let cdc = props.content_defined_chunking().expect("CDC should be set");
832+
assert_eq!(cdc.min_chunk_size, 128 * 1024);
833+
assert_eq!(cdc.max_chunk_size, 512 * 1024);
834+
assert_eq!(cdc.norm_level, 2);
835+
}
836+
837+
#[test]
838+
fn test_cdc_disabled_by_default() {
839+
let mut opts = TableParquetOptions::default();
840+
opts.arrow_schema(&Arc::new(Schema::empty()));
841+
842+
let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
843+
assert!(props.content_defined_chunking().is_none());
844+
}
845+
846+
#[test]
847+
fn test_cdc_round_trip_through_writer_props() {
848+
let mut opts = TableParquetOptions::default();
849+
opts.global.use_content_defined_chunking = Some(CdcOptions {
850+
min_chunk_size: 64 * 1024,
851+
max_chunk_size: 2 * 1024 * 1024,
852+
norm_level: -1,
853+
});
854+
opts.arrow_schema(&Arc::new(Schema::empty()));
855+
856+
let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
857+
let recovered = session_config_from_writer_props(&props);
858+
859+
let cdc = recovered.global.use_content_defined_chunking.unwrap();
860+
assert_eq!(cdc.min_chunk_size, 64 * 1024);
861+
assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024);
862+
assert_eq!(cdc.norm_level, -1);
863+
}
864+
865+
#[test]
866+
fn test_cdc_validation_zero_min_chunk_size() {
867+
let mut opts = TableParquetOptions::default();
868+
opts.global.use_content_defined_chunking = Some(CdcOptions {
869+
min_chunk_size: 0,
870+
..CdcOptions::default()
871+
});
872+
opts.arrow_schema(&Arc::new(Schema::empty()));
873+
assert!(WriterPropertiesBuilder::try_from(&opts).is_err());
874+
}
875+
876+
#[test]
877+
fn test_cdc_validation_max_not_greater_than_min() {
878+
let mut opts = TableParquetOptions::default();
879+
opts.global.use_content_defined_chunking = Some(CdcOptions {
880+
min_chunk_size: 512 * 1024,
881+
max_chunk_size: 256 * 1024,
882+
..CdcOptions::default()
883+
});
884+
opts.arrow_schema(&Arc::new(Schema::empty()));
885+
assert!(WriterPropertiesBuilder::try_from(&opts).is_err());
886+
}
887+
789888
#[test]
790889
fn test_bloom_filter_set_ndv_only() {
791890
// the TableParquetOptions::default, with only ndv set

0 commit comments

Comments
 (0)