Skip to content

Commit 2a5be51

Browse files
authored
[branch-54] refactor: give parquet CDC options an explicit enabled flag (backport #22632) (#22648)
## Which issue does this PR close? - Backport of #22632 to `branch-54`. ## Rationale for this change Content-defined chunking (CDC) write options were added in #21110 and are slated for the 54.0.0 release. This backports the refactor in #22632 so the config/proto surface ships in its final form, before the release goes out. The CDC options previously worked as `use_content_defined_chunking: Option<CdcOptions>` with a `ConfigField` impl that accepted a bare `use_content_defined_chunking = true|false` and otherwise enabled CDC implicitly when any sub-field was set. This has a few problems: - **Naming diverges from parquet-rs.** `WriterProperties` exposes `content_defined_chunking()` / `set_content_defined_chunking(Option<CdcOptions>)` with no `use_` prefix. - **Implicit / order-dependent on the SQL side.** Format options in `COPY ... OPTIONS` / `CREATE EXTERNAL TABLE ... OPTIONS` are applied from a `HashMap` (non-deterministic order). With the old bare-boolean form, mixing `... = false` with a sub-field could resolve to enabled or disabled depending on iteration order. - **Extra machinery.** Supporting the bare boolean required hand-written `ConfigField` impls and a `#[expect(clippy::should_implement_trait)]` workaround, plus a zero-sentinel fallback in the proto mapping. Since CDC is unreleased, the config/proto surface can still be changed freely. ## What changes are included in this PR? - Rename the `ParquetOptions` field `use_content_defined_chunking` -> `content_defined_chunking` (matches parquet-rs). - Make `CdcOptions` a plain `config_namespace!` with an explicit `enabled: bool` field alongside the chunking parameters; the field is a bare `CdcOptions` (no longer `Option<CdcOptions>`). CDC is on iff `content_defined_chunking.enabled` is true. Setting a parameter no longer implicitly enables CDC, and the result is independent of key order. - Add `CdcOptions::enabled()` / `CdcOptions::disabled()` shorthand constructors. - Drop the `ConfigField` impls and the `should_implement_trait` workaround — all generated by the macro now. - Add an `enabled` field to the proto `CdcOptions` message so the proto <-> config mapping is a plain field copy in both directions. - Update unit tests, regenerate config docs + the `information_schema` snapshot, and add `parquet_cdc_config.slt` documenting the resolution behavior. ## Are these changes tested? Yes — `datafusion-common` config + writer unit tests, `datafusion-proto-common` proto round-trip tests, `datafusion/core` parquet integration tests, and sqllogictest (`parquet_cdc.slt` + new `parquet_cdc_config.slt`). Cherry-pick applied cleanly onto `branch-54`; affected crates build and the CDC unit tests pass. ## Are there any user-facing changes? Yes, but only to the unreleased CDC options: - Config key `datafusion.execution.parquet.use_content_defined_chunking` -> `datafusion.execution.parquet.content_defined_chunking.enabled` (plus `.min_chunk_size` / `.max_chunk_size` / `.norm_level`). - The bare-boolean form is removed; enable/disable via `content_defined_chunking.enabled = true|false`. No released API is affected. 🤖 Generated with [Claude Code](https://claude.com/claude-code)
1 parent b508f1f commit 2a5be51

15 files changed

Lines changed: 461 additions & 449 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 59 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -709,134 +709,50 @@ config_namespace! {
709709
}
710710
}
711711

712-
/// Options for content-defined chunking (CDC) when writing parquet files.
713-
/// See [`ParquetOptions::use_content_defined_chunking`].
714-
///
715-
/// Can be enabled with default options by setting
716-
/// `use_content_defined_chunking` to `true`, or configured with sub-fields
717-
/// like `use_content_defined_chunking.min_chunk_size`.
718-
#[derive(Debug, Clone, PartialEq)]
719-
pub struct CdcOptions {
720-
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
721-
/// until this many bytes have been accumulated. Default is 256 KiB.
722-
pub min_chunk_size: usize,
723-
724-
/// Maximum chunk size in bytes. A split is forced when the accumulated
725-
/// size exceeds this value. Default is 1 MiB.
726-
pub max_chunk_size: usize,
727-
728-
/// Normalization level. Increasing this improves deduplication ratio
729-
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
730-
pub norm_level: i32,
731-
}
732-
733-
// Note: `CdcOptions` intentionally does NOT implement `Default` so that the
734-
// blanket `impl<F: ConfigField + Default> ConfigField for Option<F>` does not
735-
// apply. This allows the specific `impl ConfigField for Option<CdcOptions>`
736-
// below to handle "true"/"false" for enabling/disabling CDC.
737-
// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`.
738-
impl CdcOptions {
739-
/// Returns a new `CdcOptions` with default values.
740-
#[expect(clippy::should_implement_trait)]
741-
pub fn default() -> Self {
742-
Self {
743-
min_chunk_size: 256 * 1024,
744-
max_chunk_size: 1024 * 1024,
745-
norm_level: 0,
746-
}
747-
}
748-
}
712+
config_namespace! {
713+
/// Options for content-defined chunking (CDC) when writing parquet files.
714+
/// Mirrors `parquet::file::properties::CdcOptions`.
715+
///
716+
/// Carried as a [`ParquetCdcOptions`] in [`ParquetOptions::content_defined_chunking`]
717+
/// with an explicit `enabled` flag, so it can be toggled with dotted config
718+
/// keys (`content_defined_chunking.enabled = true|false`) and the result is
719+
/// independent of the order in which the keys are set.
720+
pub struct ParquetCdcOptions {
721+
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
722+
/// parquet files. When enabled, parallel writing is automatically disabled
723+
/// since the chunker state must persist across row groups.
724+
pub enabled: bool, default = false
749725

750-
impl ConfigField for CdcOptions {
751-
fn set(&mut self, key: &str, value: &str) -> Result<()> {
752-
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
753-
match key {
754-
"min_chunk_size" => self.min_chunk_size.set(rem, value),
755-
"max_chunk_size" => self.max_chunk_size.set(rem, value),
756-
"norm_level" => self.norm_level.set(rem, value),
757-
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
758-
}
759-
}
726+
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
727+
/// until this many bytes have been accumulated. Default is 256 KiB.
728+
pub min_chunk_size: usize, default = 256 * 1024
760729

761-
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
762-
let key = format!("{key_prefix}.min_chunk_size");
763-
self.min_chunk_size.visit(v, &key, "Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB.");
764-
let key = format!("{key_prefix}.max_chunk_size");
765-
self.max_chunk_size.visit(v, &key, "Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB.");
766-
let key = format!("{key_prefix}.norm_level");
767-
self.norm_level.visit(v, &key, "Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0.");
768-
}
730+
/// Maximum chunk size in bytes. A split is forced when the accumulated
731+
/// size exceeds this value. Default is 1 MiB.
732+
pub max_chunk_size: usize, default = 1024 * 1024
769733

770-
fn reset(&mut self, key: &str) -> Result<()> {
771-
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
772-
match key {
773-
"min_chunk_size" => {
774-
if rem.is_empty() {
775-
self.min_chunk_size = CdcOptions::default().min_chunk_size;
776-
Ok(())
777-
} else {
778-
self.min_chunk_size.reset(rem)
779-
}
780-
}
781-
"max_chunk_size" => {
782-
if rem.is_empty() {
783-
self.max_chunk_size = CdcOptions::default().max_chunk_size;
784-
Ok(())
785-
} else {
786-
self.max_chunk_size.reset(rem)
787-
}
788-
}
789-
"norm_level" => {
790-
if rem.is_empty() {
791-
self.norm_level = CdcOptions::default().norm_level;
792-
Ok(())
793-
} else {
794-
self.norm_level.reset(rem)
795-
}
796-
}
797-
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
798-
}
734+
/// Normalization level. Increasing this improves deduplication ratio
735+
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
736+
pub norm_level: i32, default = 0
799737
}
800738
}
801739

802-
/// `ConfigField` for `Option<CdcOptions>` — allows setting the option to
803-
/// `"true"` (enable with defaults) or `"false"` (disable), in addition to
804-
/// setting individual sub-fields like `min_chunk_size`.
805-
impl ConfigField for Option<CdcOptions> {
806-
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
807-
match self {
808-
Some(s) => s.visit(v, key, description),
809-
None => v.none(key, description),
810-
}
811-
}
812-
813-
fn set(&mut self, key: &str, value: &str) -> Result<()> {
814-
if key.is_empty() {
815-
match value.to_ascii_lowercase().as_str() {
816-
"true" => {
817-
*self = Some(CdcOptions::default());
818-
Ok(())
819-
}
820-
"false" => {
821-
*self = None;
822-
Ok(())
823-
}
824-
_ => _config_err!(
825-
"Expected 'true' or 'false' for use_content_defined_chunking, got '{value}'"
826-
),
827-
}
828-
} else {
829-
self.get_or_insert_with(CdcOptions::default).set(key, value)
740+
impl ParquetCdcOptions {
741+
/// Returns enabled CDC options with the default chunking parameters.
742+
///
743+
/// Shorthand for `ParquetCdcOptions { enabled: true, ..Default::default() }`;
744+
/// combine with struct-update syntax to override parameters, e.g.
745+
/// `ParquetCdcOptions { min_chunk_size: 4096, ..ParquetCdcOptions::enabled() }`.
746+
pub fn enabled() -> Self {
747+
Self {
748+
enabled: true,
749+
..Default::default()
830750
}
831751
}
832752

833-
fn reset(&mut self, key: &str) -> Result<()> {
834-
if key.is_empty() {
835-
*self = None;
836-
Ok(())
837-
} else {
838-
self.get_or_insert_with(CdcOptions::default).reset(key)
839-
}
753+
/// Returns disabled CDC options (equivalent to [`ParquetCdcOptions::default`]).
754+
pub fn disabled() -> Self {
755+
Self::default()
840756
}
841757
}
842758

@@ -1036,11 +952,14 @@ config_namespace! {
1036952
/// data frame.
1037953
pub maximum_buffered_record_batches_per_stream: usize, default = 2
1038954

1039-
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
1040-
/// parquet files. When `Some`, CDC is enabled with the given options; when `None`
1041-
/// (the default), CDC is disabled. When CDC is enabled, parallel writing is
1042-
/// automatically disabled since the chunker state must persist across row groups.
1043-
pub use_content_defined_chunking: Option<CdcOptions>, default = None
955+
/// (writing) EXPERIMENTAL: Content-defined chunking (CDC) options when writing
956+
/// parquet files. Disabled by default; toggle with
957+
/// `content_defined_chunking.enabled = true|false`. The chunking parameters live
958+
/// under the same prefix (e.g. `content_defined_chunking.min_chunk_size`). When
959+
/// enabled, parallel writing is automatically disabled since the chunker state
960+
/// must persist across row groups. Mirrors
961+
/// `parquet::file::properties::WriterProperties::content_defined_chunking`.
962+
pub content_defined_chunking: ParquetCdcOptions, default = Default::default()
1044963
}
1045964
}
1046965

@@ -4036,73 +3955,54 @@ mod tests {
40363955

40373956
#[cfg(feature = "parquet")]
40383957
#[test]
4039-
fn set_cdc_option_with_boolean_true() {
3958+
fn set_cdc_enabled_flag() {
40403959
use crate::config::ConfigOptions;
40413960

40423961
let mut config = ConfigOptions::default();
4043-
assert!(
4044-
config
4045-
.execution
4046-
.parquet
4047-
.use_content_defined_chunking
4048-
.is_none()
4049-
);
3962+
// CDC is disabled by default.
3963+
assert!(!config.execution.parquet.content_defined_chunking.enabled);
40503964

4051-
// Setting to "true" should enable CDC with default options
3965+
// `.enabled = true` enables CDC; parameters keep their defaults.
40523966
config
40533967
.set(
4054-
"datafusion.execution.parquet.use_content_defined_chunking",
3968+
"datafusion.execution.parquet.content_defined_chunking.enabled",
40553969
"true",
40563970
)
40573971
.unwrap();
4058-
let cdc = config
4059-
.execution
4060-
.parquet
4061-
.use_content_defined_chunking
4062-
.as_ref()
4063-
.expect("CDC should be enabled");
3972+
let cdc = &config.execution.parquet.content_defined_chunking;
3973+
assert!(cdc.enabled);
40643974
assert_eq!(cdc.min_chunk_size, 256 * 1024);
40653975
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
40663976
assert_eq!(cdc.norm_level, 0);
40673977

4068-
// Setting to "false" should disable CDC
3978+
// `.enabled = false` disables CDC.
40693979
config
40703980
.set(
4071-
"datafusion.execution.parquet.use_content_defined_chunking",
3981+
"datafusion.execution.parquet.content_defined_chunking.enabled",
40723982
"false",
40733983
)
40743984
.unwrap();
4075-
assert!(
4076-
config
4077-
.execution
4078-
.parquet
4079-
.use_content_defined_chunking
4080-
.is_none()
4081-
);
3985+
assert!(!config.execution.parquet.content_defined_chunking.enabled);
40823986
}
40833987

40843988
#[cfg(feature = "parquet")]
40853989
#[test]
4086-
fn set_cdc_option_with_subfields() {
3990+
fn set_cdc_param_does_not_enable() {
40873991
use crate::config::ConfigOptions;
40883992

40893993
let mut config = ConfigOptions::default();
40903994

4091-
// Setting sub-fields should also enable CDC
3995+
// Setting a parameter does NOT enable CDC (`enabled` is a distinct field,
3996+
// defaulting to false), and the result is independent of key order.
40923997
config
40933998
.set(
4094-
"datafusion.execution.parquet.use_content_defined_chunking.min_chunk_size",
3999+
"datafusion.execution.parquet.content_defined_chunking.min_chunk_size",
40954000
"1024",
40964001
)
40974002
.unwrap();
4098-
let cdc = config
4099-
.execution
4100-
.parquet
4101-
.use_content_defined_chunking
4102-
.as_ref()
4103-
.expect("CDC should be enabled");
4003+
let cdc = &config.execution.parquet.content_defined_chunking;
4004+
assert!(!cdc.enabled);
41044005
assert_eq!(cdc.min_chunk_size, 1024);
4105-
// Other fields should be defaults
41064006
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
41074007
assert_eq!(cdc.norm_level, 0);
41084008
}

0 commit comments

Comments
 (0)