Skip to content

Commit 6d9f102

Browse files
committed
refactor: give parquet CDC options an explicit enabled flag
Content-defined chunking (CDC) write options were added in #21110 and have not been released yet (current workspace is 53.x; CDC is slated for 54.0.0), so the config and proto surfaces can still be changed freely. This reworks it before it ships. What changes: * Rename the `ParquetOptions` field `use_content_defined_chunking` -> `content_defined_chunking`. * `CdcOptions` becomes a plain `config_namespace!` with an explicit `enabled: bool` field alongside the chunking parameters, and the field is a bare `CdcOptions` (no longer `Option<CdcOptions>`). CDC is on iff `content_defined_chunking.enabled` is true. Add `CdcOptions::enabled()` / `CdcOptions::disabled()` shorthand constructors. * Drop the bespoke `impl ConfigField for CdcOptions` / `impl ConfigField for Option<CdcOptions>` and the `#[expect(clippy::should_implement_trait)]` workaround that backed the old bare-boolean form. Everything is now generated by the macro. * Add an `enabled` field to the proto `CdcOptions` message so the proto <-> config mapping is a direct field copy, dropping the previous presence-encoding and the zero-sentinel fallback for the chunk sizes. Why this is better: * Naming matches parquet-rs. parquet's `WriterProperties` exposes `content_defined_chunking()` / `set_content_defined_chunking(...)` with no `use_` prefix; the field name now lines up across the boundary. * Explicit, not magic. CDC is toggled with a real `content_defined_chunking.enabled = true|false` key instead of a special bare-boolean parse, and setting a chunking parameter no longer silently turns CDC on. * No order-dependence on the SQL side. Format options in `COPY ... OPTIONS` and `CREATE EXTERNAL TABLE ... OPTIONS` are applied from a `HashMap`, i.e. in non-deterministic order. With a separate `enabled` flag, the flag and the parameters are set independently, so the resolved config never depends on the order in which the keys happen to be applied. * Simpler. No hand-written `ConfigField` impls, no clippy hack, and the proto serialization is a plain field copy in both directions. Tests, generated config docs, and the information_schema snapshot are updated accordingly; a new `parquet_cdc_config.slt` documents the resolution behavior (enable toggle, parameter-does-not-enable, order independence).
1 parent a8761a6 commit 6d9f102

15 files changed

Lines changed: 268 additions & 264 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 59 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -709,134 +709,50 @@ config_namespace! {
709709
}
710710
}
711711

712-
/// Options for content-defined chunking (CDC) when writing parquet files.
713-
/// See [`ParquetOptions::use_content_defined_chunking`].
714-
///
715-
/// Can be enabled with default options by setting
716-
/// `use_content_defined_chunking` to `true`, or configured with sub-fields
717-
/// like `use_content_defined_chunking.min_chunk_size`.
718-
#[derive(Debug, Clone, PartialEq)]
719-
pub struct CdcOptions {
720-
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
721-
/// until this many bytes have been accumulated. Default is 256 KiB.
722-
pub min_chunk_size: usize,
723-
724-
/// Maximum chunk size in bytes. A split is forced when the accumulated
725-
/// size exceeds this value. Default is 1 MiB.
726-
pub max_chunk_size: usize,
727-
728-
/// Normalization level. Increasing this improves deduplication ratio
729-
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
730-
pub norm_level: i32,
731-
}
732-
733-
// Note: `CdcOptions` intentionally does NOT implement `Default` so that the
734-
// blanket `impl<F: ConfigField + Default> ConfigField for Option<F>` does not
735-
// apply. This allows the specific `impl ConfigField for Option<CdcOptions>`
736-
// below to handle "true"/"false" for enabling/disabling CDC.
737-
// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`.
738-
impl CdcOptions {
739-
/// Returns a new `CdcOptions` with default values.
740-
#[expect(clippy::should_implement_trait)]
741-
pub fn default() -> Self {
742-
Self {
743-
min_chunk_size: 256 * 1024,
744-
max_chunk_size: 1024 * 1024,
745-
norm_level: 0,
746-
}
747-
}
748-
}
712+
config_namespace! {
713+
/// Options for content-defined chunking (CDC) when writing parquet files.
714+
/// Mirrors `parquet::file::properties::CdcOptions`.
715+
///
716+
/// Carried as a [`CdcOptions`] in [`ParquetOptions::content_defined_chunking`]
717+
/// with an explicit `enabled` flag, so it can be toggled with dotted config
718+
/// keys (`content_defined_chunking.enabled = true|false`) and the result is
719+
/// independent of the order in which the keys are set.
720+
pub struct CdcOptions {
721+
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
722+
/// parquet files. When enabled, parallel writing is automatically disabled
723+
/// since the chunker state must persist across row groups.
724+
pub enabled: bool, default = false
749725

750-
impl ConfigField for CdcOptions {
751-
fn set(&mut self, key: &str, value: &str) -> Result<()> {
752-
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
753-
match key {
754-
"min_chunk_size" => self.min_chunk_size.set(rem, value),
755-
"max_chunk_size" => self.max_chunk_size.set(rem, value),
756-
"norm_level" => self.norm_level.set(rem, value),
757-
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
758-
}
759-
}
726+
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
727+
/// until this many bytes have been accumulated. Default is 256 KiB.
728+
pub min_chunk_size: usize, default = 256 * 1024
760729

761-
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
762-
let key = format!("{key_prefix}.min_chunk_size");
763-
self.min_chunk_size.visit(v, &key, "Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB.");
764-
let key = format!("{key_prefix}.max_chunk_size");
765-
self.max_chunk_size.visit(v, &key, "Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB.");
766-
let key = format!("{key_prefix}.norm_level");
767-
self.norm_level.visit(v, &key, "Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0.");
768-
}
730+
/// Maximum chunk size in bytes. A split is forced when the accumulated
731+
/// size exceeds this value. Default is 1 MiB.
732+
pub max_chunk_size: usize, default = 1024 * 1024
769733

770-
fn reset(&mut self, key: &str) -> Result<()> {
771-
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
772-
match key {
773-
"min_chunk_size" => {
774-
if rem.is_empty() {
775-
self.min_chunk_size = CdcOptions::default().min_chunk_size;
776-
Ok(())
777-
} else {
778-
self.min_chunk_size.reset(rem)
779-
}
780-
}
781-
"max_chunk_size" => {
782-
if rem.is_empty() {
783-
self.max_chunk_size = CdcOptions::default().max_chunk_size;
784-
Ok(())
785-
} else {
786-
self.max_chunk_size.reset(rem)
787-
}
788-
}
789-
"norm_level" => {
790-
if rem.is_empty() {
791-
self.norm_level = CdcOptions::default().norm_level;
792-
Ok(())
793-
} else {
794-
self.norm_level.reset(rem)
795-
}
796-
}
797-
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
798-
}
734+
/// Normalization level. Increasing this improves deduplication ratio
735+
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
736+
pub norm_level: i32, default = 0
799737
}
800738
}
801739

802-
/// `ConfigField` for `Option<CdcOptions>` — allows setting the option to
803-
/// `"true"` (enable with defaults) or `"false"` (disable), in addition to
804-
/// setting individual sub-fields like `min_chunk_size`.
805-
impl ConfigField for Option<CdcOptions> {
806-
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
807-
match self {
808-
Some(s) => s.visit(v, key, description),
809-
None => v.none(key, description),
810-
}
811-
}
812-
813-
fn set(&mut self, key: &str, value: &str) -> Result<()> {
814-
if key.is_empty() {
815-
match value.to_ascii_lowercase().as_str() {
816-
"true" => {
817-
*self = Some(CdcOptions::default());
818-
Ok(())
819-
}
820-
"false" => {
821-
*self = None;
822-
Ok(())
823-
}
824-
_ => _config_err!(
825-
"Expected 'true' or 'false' for use_content_defined_chunking, got '{value}'"
826-
),
827-
}
828-
} else {
829-
self.get_or_insert_with(CdcOptions::default).set(key, value)
740+
impl CdcOptions {
741+
/// Returns enabled CDC options with the default chunking parameters.
742+
///
743+
/// Shorthand for `CdcOptions { enabled: true, ..Default::default() }`; combine
744+
/// with struct-update syntax to override parameters, e.g.
745+
/// `CdcOptions { min_chunk_size: 4096, ..CdcOptions::enabled() }`.
746+
pub fn enabled() -> Self {
747+
Self {
748+
enabled: true,
749+
..Default::default()
830750
}
831751
}
832752

833-
fn reset(&mut self, key: &str) -> Result<()> {
834-
if key.is_empty() {
835-
*self = None;
836-
Ok(())
837-
} else {
838-
self.get_or_insert_with(CdcOptions::default).reset(key)
839-
}
753+
/// Returns disabled CDC options (equivalent to [`CdcOptions::default`]).
754+
pub fn disabled() -> Self {
755+
Self::default()
840756
}
841757
}
842758

@@ -1036,11 +952,14 @@ config_namespace! {
1036952
/// data frame.
1037953
pub maximum_buffered_record_batches_per_stream: usize, default = 2
1038954

1039-
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
1040-
/// parquet files. When `Some`, CDC is enabled with the given options; when `None`
1041-
/// (the default), CDC is disabled. When CDC is enabled, parallel writing is
1042-
/// automatically disabled since the chunker state must persist across row groups.
1043-
pub use_content_defined_chunking: Option<CdcOptions>, default = None
955+
/// (writing) EXPERIMENTAL: Content-defined chunking (CDC) options when writing
956+
/// parquet files. Disabled by default; toggle with
957+
/// `content_defined_chunking.enabled = true|false`. The chunking parameters live
958+
/// under the same prefix (e.g. `content_defined_chunking.min_chunk_size`). When
959+
/// enabled, parallel writing is automatically disabled since the chunker state
960+
/// must persist across row groups. Mirrors
961+
/// `parquet::file::properties::WriterProperties::content_defined_chunking`.
962+
pub content_defined_chunking: CdcOptions, default = Default::default()
1044963
}
1045964
}
1046965

@@ -4041,73 +3960,54 @@ mod tests {
40413960

40423961
#[cfg(feature = "parquet")]
40433962
#[test]
4044-
fn set_cdc_option_with_boolean_true() {
3963+
fn set_cdc_enabled_flag() {
40453964
use crate::config::ConfigOptions;
40463965

40473966
let mut config = ConfigOptions::default();
4048-
assert!(
4049-
config
4050-
.execution
4051-
.parquet
4052-
.use_content_defined_chunking
4053-
.is_none()
4054-
);
3967+
// CDC is disabled by default.
3968+
assert!(!config.execution.parquet.content_defined_chunking.enabled);
40553969

4056-
// Setting to "true" should enable CDC with default options
3970+
// `.enabled = true` enables CDC; parameters keep their defaults.
40573971
config
40583972
.set(
4059-
"datafusion.execution.parquet.use_content_defined_chunking",
3973+
"datafusion.execution.parquet.content_defined_chunking.enabled",
40603974
"true",
40613975
)
40623976
.unwrap();
4063-
let cdc = config
4064-
.execution
4065-
.parquet
4066-
.use_content_defined_chunking
4067-
.as_ref()
4068-
.expect("CDC should be enabled");
3977+
let cdc = &config.execution.parquet.content_defined_chunking;
3978+
assert!(cdc.enabled);
40693979
assert_eq!(cdc.min_chunk_size, 256 * 1024);
40703980
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
40713981
assert_eq!(cdc.norm_level, 0);
40723982

4073-
// Setting to "false" should disable CDC
3983+
// `.enabled = false` disables CDC.
40743984
config
40753985
.set(
4076-
"datafusion.execution.parquet.use_content_defined_chunking",
3986+
"datafusion.execution.parquet.content_defined_chunking.enabled",
40773987
"false",
40783988
)
40793989
.unwrap();
4080-
assert!(
4081-
config
4082-
.execution
4083-
.parquet
4084-
.use_content_defined_chunking
4085-
.is_none()
4086-
);
3990+
assert!(!config.execution.parquet.content_defined_chunking.enabled);
40873991
}
40883992

40893993
#[cfg(feature = "parquet")]
40903994
#[test]
4091-
fn set_cdc_option_with_subfields() {
3995+
fn set_cdc_param_does_not_enable() {
40923996
use crate::config::ConfigOptions;
40933997

40943998
let mut config = ConfigOptions::default();
40953999

4096-
// Setting sub-fields should also enable CDC
4000+
// Setting a parameter does NOT enable CDC (`enabled` is a distinct field,
4001+
// defaulting to false), and the result is independent of key order.
40974002
config
40984003
.set(
4099-
"datafusion.execution.parquet.use_content_defined_chunking.min_chunk_size",
4004+
"datafusion.execution.parquet.content_defined_chunking.min_chunk_size",
41004005
"1024",
41014006
)
41024007
.unwrap();
4103-
let cdc = config
4104-
.execution
4105-
.parquet
4106-
.use_content_defined_chunking
4107-
.as_ref()
4108-
.expect("CDC should be enabled");
4008+
let cdc = &config.execution.parquet.content_defined_chunking;
4009+
assert!(!cdc.enabled);
41094010
assert_eq!(cdc.min_chunk_size, 1024);
4110-
// Other fields should be defaults
41114011
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
41124012
assert_eq!(cdc.norm_level, 0);
41134013
}

0 commit comments

Comments
 (0)