Skip to content

Commit 30090c1

Browse files
committed
refactor: give parquet CDC options an explicit enabled flag
Content-defined chunking (CDC) write options were added in #21110 and have not been released yet (current workspace is 53.x; CDC is slated for 54.0.0), so the config and proto surfaces can still be changed freely. This reworks it before it ships. What changes: * Rename the `ParquetOptions` field `use_content_defined_chunking` -> `content_defined_chunking`. * `CdcOptions` becomes a plain `config_namespace!` with an explicit `enabled: bool` field alongside the chunking parameters, and the field is a bare `CdcOptions` (no longer `Option<CdcOptions>`). CDC is on iff `content_defined_chunking.enabled` is true. Add `CdcOptions::enabled()` / `CdcOptions::disabled()` shorthand constructors. * Drop the bespoke `impl ConfigField for CdcOptions` / `impl ConfigField for Option<CdcOptions>` and the `#[expect(clippy::should_implement_trait)]` workaround that backed the old bare-boolean form. Everything is now generated by the macro. * Add an `enabled` field to the proto `CdcOptions` message so the proto <-> config mapping is a direct field copy, dropping the previous presence-encoding and the zero-sentinel fallback for the chunk sizes. Why this is better: * Naming matches parquet-rs. parquet's `WriterProperties` exposes `content_defined_chunking()` / `set_content_defined_chunking(...)` with no `use_` prefix; the field name now lines up across the boundary. * Explicit, not magic. CDC is toggled with a real `content_defined_chunking.enabled = true|false` key instead of a special bare-boolean parse, and setting a chunking parameter no longer silently turns CDC on. * No order-dependence on the SQL side. Format options in `COPY ... OPTIONS` and `CREATE EXTERNAL TABLE ... OPTIONS` are applied from a `HashMap`, i.e. in non-deterministic order. With a separate `enabled` flag, the flag and the parameters are set independently, so the resolved config never depends on the order in which the keys happen to be applied. * Simpler. No hand-written `ConfigField` impls, no clippy hack, and the proto serialization is a plain field copy in both directions. Tests, generated config docs, and the information_schema snapshot are updated accordingly; a new `parquet_cdc_config.slt` documents the resolution behavior (enable toggle, parameter-does-not-enable, order independence).
1 parent b6d4c25 commit 30090c1

15 files changed

Lines changed: 265 additions & 254 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 59 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -709,134 +709,50 @@ config_namespace! {
709709
}
710710
}
711711

712-
/// Options for content-defined chunking (CDC) when writing parquet files.
713-
/// See [`ParquetOptions::use_content_defined_chunking`].
714-
///
715-
/// Can be enabled with default options by setting
716-
/// `use_content_defined_chunking` to `true`, or configured with sub-fields
717-
/// like `use_content_defined_chunking.min_chunk_size`.
718-
#[derive(Debug, Clone, PartialEq)]
719-
pub struct CdcOptions {
720-
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
721-
/// until this many bytes have been accumulated. Default is 256 KiB.
722-
pub min_chunk_size: usize,
723-
724-
/// Maximum chunk size in bytes. A split is forced when the accumulated
725-
/// size exceeds this value. Default is 1 MiB.
726-
pub max_chunk_size: usize,
727-
728-
/// Normalization level. Increasing this improves deduplication ratio
729-
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
730-
pub norm_level: i32,
731-
}
732-
733-
// Note: `CdcOptions` intentionally does NOT implement `Default` so that the
734-
// blanket `impl<F: ConfigField + Default> ConfigField for Option<F>` does not
735-
// apply. This allows the specific `impl ConfigField for Option<CdcOptions>`
736-
// below to handle "true"/"false" for enabling/disabling CDC.
737-
// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`.
738-
impl CdcOptions {
739-
/// Returns a new `CdcOptions` with default values.
740-
#[expect(clippy::should_implement_trait)]
741-
pub fn default() -> Self {
742-
Self {
743-
min_chunk_size: 256 * 1024,
744-
max_chunk_size: 1024 * 1024,
745-
norm_level: 0,
746-
}
747-
}
748-
}
712+
config_namespace! {
713+
/// Options for content-defined chunking (CDC) when writing parquet files.
714+
/// Mirrors `parquet::file::properties::CdcOptions`.
715+
///
716+
/// Carried as a [`CdcOptions`] in [`ParquetOptions::content_defined_chunking`]
717+
/// with an explicit `enabled` flag, so it can be toggled with dotted config
718+
/// keys (`content_defined_chunking.enabled = true|false`) and the result is
719+
/// independent of the order in which the keys are set.
720+
pub struct CdcOptions {
721+
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
722+
/// parquet files. When enabled, parallel writing is automatically disabled
723+
/// since the chunker state must persist across row groups.
724+
pub enabled: bool, default = false
749725

750-
impl ConfigField for CdcOptions {
751-
fn set(&mut self, key: &str, value: &str) -> Result<()> {
752-
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
753-
match key {
754-
"min_chunk_size" => self.min_chunk_size.set(rem, value),
755-
"max_chunk_size" => self.max_chunk_size.set(rem, value),
756-
"norm_level" => self.norm_level.set(rem, value),
757-
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
758-
}
759-
}
726+
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
727+
/// until this many bytes have been accumulated. Default is 256 KiB.
728+
pub min_chunk_size: usize, default = 256 * 1024
760729

761-
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
762-
let key = format!("{key_prefix}.min_chunk_size");
763-
self.min_chunk_size.visit(v, &key, "Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB.");
764-
let key = format!("{key_prefix}.max_chunk_size");
765-
self.max_chunk_size.visit(v, &key, "Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB.");
766-
let key = format!("{key_prefix}.norm_level");
767-
self.norm_level.visit(v, &key, "Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0.");
768-
}
730+
/// Maximum chunk size in bytes. A split is forced when the accumulated
731+
/// size exceeds this value. Default is 1 MiB.
732+
pub max_chunk_size: usize, default = 1024 * 1024
769733

770-
fn reset(&mut self, key: &str) -> Result<()> {
771-
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
772-
match key {
773-
"min_chunk_size" => {
774-
if rem.is_empty() {
775-
self.min_chunk_size = CdcOptions::default().min_chunk_size;
776-
Ok(())
777-
} else {
778-
self.min_chunk_size.reset(rem)
779-
}
780-
}
781-
"max_chunk_size" => {
782-
if rem.is_empty() {
783-
self.max_chunk_size = CdcOptions::default().max_chunk_size;
784-
Ok(())
785-
} else {
786-
self.max_chunk_size.reset(rem)
787-
}
788-
}
789-
"norm_level" => {
790-
if rem.is_empty() {
791-
self.norm_level = CdcOptions::default().norm_level;
792-
Ok(())
793-
} else {
794-
self.norm_level.reset(rem)
795-
}
796-
}
797-
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
798-
}
734+
/// Normalization level. Increasing this improves deduplication ratio
735+
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
736+
pub norm_level: i32, default = 0
799737
}
800738
}
801739

802-
/// `ConfigField` for `Option<CdcOptions>` — allows setting the option to
803-
/// `"true"` (enable with defaults) or `"false"` (disable), in addition to
804-
/// setting individual sub-fields like `min_chunk_size`.
805-
impl ConfigField for Option<CdcOptions> {
806-
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
807-
match self {
808-
Some(s) => s.visit(v, key, description),
809-
None => v.none(key, description),
810-
}
811-
}
812-
813-
fn set(&mut self, key: &str, value: &str) -> Result<()> {
814-
if key.is_empty() {
815-
match value.to_ascii_lowercase().as_str() {
816-
"true" => {
817-
*self = Some(CdcOptions::default());
818-
Ok(())
819-
}
820-
"false" => {
821-
*self = None;
822-
Ok(())
823-
}
824-
_ => _config_err!(
825-
"Expected 'true' or 'false' for use_content_defined_chunking, got '{value}'"
826-
),
827-
}
828-
} else {
829-
self.get_or_insert_with(CdcOptions::default).set(key, value)
740+
impl CdcOptions {
741+
/// Returns enabled CDC options with the default chunking parameters.
742+
///
743+
/// Shorthand for `CdcOptions { enabled: true, ..Default::default() }`; combine
744+
/// with struct-update syntax to override parameters, e.g.
745+
/// `CdcOptions { min_chunk_size: 4096, ..CdcOptions::enabled() }`.
746+
pub fn enabled() -> Self {
747+
Self {
748+
enabled: true,
749+
..Default::default()
830750
}
831751
}
832752

833-
fn reset(&mut self, key: &str) -> Result<()> {
834-
if key.is_empty() {
835-
*self = None;
836-
Ok(())
837-
} else {
838-
self.get_or_insert_with(CdcOptions::default).reset(key)
839-
}
753+
/// Returns disabled CDC options (equivalent to [`CdcOptions::default`]).
754+
pub fn disabled() -> Self {
755+
Self::default()
840756
}
841757
}
842758

@@ -1036,11 +952,14 @@ config_namespace! {
1036952
/// data frame.
1037953
pub maximum_buffered_record_batches_per_stream: usize, default = 2
1038954

1039-
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
1040-
/// parquet files. When `Some`, CDC is enabled with the given options; when `None`
1041-
/// (the default), CDC is disabled. When CDC is enabled, parallel writing is
1042-
/// automatically disabled since the chunker state must persist across row groups.
1043-
pub use_content_defined_chunking: Option<CdcOptions>, default = None
955+
/// (writing) EXPERIMENTAL: Content-defined chunking (CDC) options when writing
956+
/// parquet files. Disabled by default; toggle with
957+
/// `content_defined_chunking.enabled = true|false`. The chunking parameters live
958+
/// under the same prefix (e.g. `content_defined_chunking.min_chunk_size`). When
959+
/// enabled, parallel writing is automatically disabled since the chunker state
960+
/// must persist across row groups. Mirrors
961+
/// `parquet::file::properties::WriterProperties::content_defined_chunking`.
962+
pub content_defined_chunking: CdcOptions, default = Default::default()
1044963
}
1045964
}
1046965

@@ -4025,73 +3944,54 @@ mod tests {
40253944

40263945
#[cfg(feature = "parquet")]
40273946
#[test]
4028-
fn set_cdc_option_with_boolean_true() {
3947+
fn set_cdc_enabled_flag() {
40293948
use crate::config::ConfigOptions;
40303949

40313950
let mut config = ConfigOptions::default();
4032-
assert!(
4033-
config
4034-
.execution
4035-
.parquet
4036-
.use_content_defined_chunking
4037-
.is_none()
4038-
);
3951+
// CDC is disabled by default.
3952+
assert!(!config.execution.parquet.content_defined_chunking.enabled);
40393953

4040-
// Setting to "true" should enable CDC with default options
3954+
// `.enabled = true` enables CDC; parameters keep their defaults.
40413955
config
40423956
.set(
4043-
"datafusion.execution.parquet.use_content_defined_chunking",
3957+
"datafusion.execution.parquet.content_defined_chunking.enabled",
40443958
"true",
40453959
)
40463960
.unwrap();
4047-
let cdc = config
4048-
.execution
4049-
.parquet
4050-
.use_content_defined_chunking
4051-
.as_ref()
4052-
.expect("CDC should be enabled");
3961+
let cdc = &config.execution.parquet.content_defined_chunking;
3962+
assert!(cdc.enabled);
40533963
assert_eq!(cdc.min_chunk_size, 256 * 1024);
40543964
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
40553965
assert_eq!(cdc.norm_level, 0);
40563966

4057-
// Setting to "false" should disable CDC
3967+
// `.enabled = false` disables CDC.
40583968
config
40593969
.set(
4060-
"datafusion.execution.parquet.use_content_defined_chunking",
3970+
"datafusion.execution.parquet.content_defined_chunking.enabled",
40613971
"false",
40623972
)
40633973
.unwrap();
4064-
assert!(
4065-
config
4066-
.execution
4067-
.parquet
4068-
.use_content_defined_chunking
4069-
.is_none()
4070-
);
3974+
assert!(!config.execution.parquet.content_defined_chunking.enabled);
40713975
}
40723976

40733977
#[cfg(feature = "parquet")]
40743978
#[test]
4075-
fn set_cdc_option_with_subfields() {
3979+
fn set_cdc_param_does_not_enable() {
40763980
use crate::config::ConfigOptions;
40773981

40783982
let mut config = ConfigOptions::default();
40793983

4080-
// Setting sub-fields should also enable CDC
3984+
// Setting a parameter does NOT enable CDC (`enabled` is a distinct field,
3985+
// defaulting to false), and the result is independent of key order.
40813986
config
40823987
.set(
4083-
"datafusion.execution.parquet.use_content_defined_chunking.min_chunk_size",
3988+
"datafusion.execution.parquet.content_defined_chunking.min_chunk_size",
40843989
"1024",
40853990
)
40863991
.unwrap();
4087-
let cdc = config
4088-
.execution
4089-
.parquet
4090-
.use_content_defined_chunking
4091-
.as_ref()
4092-
.expect("CDC should be enabled");
3992+
let cdc = &config.execution.parquet.content_defined_chunking;
3993+
assert!(!cdc.enabled);
40933994
assert_eq!(cdc.min_chunk_size, 1024);
4094-
// Other fields should be defaults
40953995
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
40963996
assert_eq!(cdc.norm_level, 0);
40973997
}

0 commit comments

Comments
 (0)