Skip to content

Commit d88ab6a

Browse files
authored
refactor: give parquet CDC options an explicit enabled flag (#22632)
## Which issue does this PR close? - None ## Rationale for this change The CDC options currently work as `use_content_defined_chunking: Option<CdcOptions>` with a `ConfigField` impl that accepts a bare `use_content_defined_chunking = true|false` and otherwise enables CDC implicitly when any sub-field is set. This has a few problems: - **Naming diverges from parquet-rs.** `WriterProperties` exposes `content_defined_chunking()` / `set_content_defined_chunking(Option<CdcOptions>)` with no `use_` prefix. - **Implicit / order-dependent on the SQL side.** Format options in `COPY ... OPTIONS` / `CREATE EXTERNAL TABLE ... OPTIONS` are applied from a `HashMap` (non-deterministic order). With the old bare-boolean form, mixing `... = false` with a sub-field, or setting a sub-field after `= false`, could resolve to enabled or disabled depending on iteration order. - **Extra machinery.** Supporting the bare boolean required a hand-written `impl ConfigField for CdcOptions` + `impl ConfigField for Option<CdcOptions>` and a `#[expect(clippy::should_implement_trait)]` workaround, plus a zero-sentinel fallback in the proto mapping. Since CDC is unreleased, the config/proto surface can still be changed freely. ## What changes are included in this PR? - Rename the `ParquetOptions` field `use_content_defined_chunking` -> `content_defined_chunking` (matches parquet-rs). - Make `CdcOptions` a plain `config_namespace!` with an explicit `enabled: bool` field alongside the chunking parameters; the field is a bare `CdcOptions` (no longer `Option<CdcOptions>`). CDC is on if `content_defined_chunking.enabled` is true. Setting a parameter no longer implicitly enables CDC, and the result is independent of key order. - Add `CdcOptions::enabled()` / `CdcOptions::disabled()` shorthand constructors. - Drop the `ConfigField` impls and the `should_implement_trait` workaround — all generated by the macro now. - Add an `enabled` field to the proto `CdcOptions` message so the proto <-> config mapping is a plain field copy in both directions (removes the presence-encoding and the zero-sentinel fallback). - Update unit tests, regenerate config docs + the `information_schema` snapshot, and add `parquet_cdc_config.slt` documenting the resolution behavior. ## Are these changes tested? Yes: - `datafusion-common` config + writer unit tests (enable toggle, parameter-does-not-enable, validation, writer round-trip). - `datafusion-proto-common` proto round-trip tests (enabled / disabled / negative norm level). - `datafusion/core` parquet integration tests (data round-trip, page boundaries). - sqllogictest: `parquet_cdc.slt` (end-to-end) and a new `parquet_cdc_config.slt` (config resolution / order independence). ## Are there any user-facing changes? Yes, but only to the unreleased CDC options: - Config key `datafusion.execution.parquet.use_content_defined_chunking` -> `datafusion.execution.parquet.content_defined_chunking.enabled` (plus `.min_chunk_size` / `.max_chunk_size` / `.norm_level`). - The bare-boolean form is removed; enable/disable via `content_defined_chunking.enabled = true|false`. No released API is affected. 🤖 Generated with [Claude Code](https://claude.com/claude-code)
1 parent 533ef35 commit d88ab6a

15 files changed

Lines changed: 470 additions & 459 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 59 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -756,134 +756,50 @@ config_namespace! {
756756
}
757757
}
758758

759-
/// Options for content-defined chunking (CDC) when writing parquet files.
760-
/// See [`ParquetOptions::use_content_defined_chunking`].
761-
///
762-
/// Can be enabled with default options by setting
763-
/// `use_content_defined_chunking` to `true`, or configured with sub-fields
764-
/// like `use_content_defined_chunking.min_chunk_size`.
765-
#[derive(Debug, Clone, PartialEq)]
766-
pub struct CdcOptions {
767-
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
768-
/// until this many bytes have been accumulated. Default is 256 KiB.
769-
pub min_chunk_size: usize,
770-
771-
/// Maximum chunk size in bytes. A split is forced when the accumulated
772-
/// size exceeds this value. Default is 1 MiB.
773-
pub max_chunk_size: usize,
774-
775-
/// Normalization level. Increasing this improves deduplication ratio
776-
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
777-
pub norm_level: i32,
778-
}
779-
780-
// Note: `CdcOptions` intentionally does NOT implement `Default` so that the
781-
// blanket `impl<F: ConfigField + Default> ConfigField for Option<F>` does not
782-
// apply. This allows the specific `impl ConfigField for Option<CdcOptions>`
783-
// below to handle "true"/"false" for enabling/disabling CDC.
784-
// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`.
785-
impl CdcOptions {
786-
/// Returns a new `CdcOptions` with default values.
787-
#[expect(clippy::should_implement_trait)]
788-
pub fn default() -> Self {
789-
Self {
790-
min_chunk_size: 256 * 1024,
791-
max_chunk_size: 1024 * 1024,
792-
norm_level: 0,
793-
}
794-
}
795-
}
759+
config_namespace! {
760+
/// Options for content-defined chunking (CDC) when writing parquet files.
761+
/// Mirrors `parquet::file::properties::CdcOptions`.
762+
///
763+
/// Carried as a [`ParquetCdcOptions`] in [`ParquetOptions::content_defined_chunking`]
764+
/// with an explicit `enabled` flag, so it can be toggled with dotted config
765+
/// keys (`content_defined_chunking.enabled = true|false`) and the result is
766+
/// independent of the order in which the keys are set.
767+
pub struct ParquetCdcOptions {
768+
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
769+
/// parquet files. When enabled, parallel writing is automatically disabled
770+
/// since the chunker state must persist across row groups.
771+
pub enabled: bool, default = false
796772

797-
impl ConfigField for CdcOptions {
798-
fn set(&mut self, key: &str, value: &str) -> Result<()> {
799-
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
800-
match key {
801-
"min_chunk_size" => self.min_chunk_size.set(rem, value),
802-
"max_chunk_size" => self.max_chunk_size.set(rem, value),
803-
"norm_level" => self.norm_level.set(rem, value),
804-
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
805-
}
806-
}
773+
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
774+
/// until this many bytes have been accumulated. Default is 256 KiB.
775+
pub min_chunk_size: usize, default = 256 * 1024
807776

808-
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
809-
let key = format!("{key_prefix}.min_chunk_size");
810-
self.min_chunk_size.visit(v, &key, "Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB.");
811-
let key = format!("{key_prefix}.max_chunk_size");
812-
self.max_chunk_size.visit(v, &key, "Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB.");
813-
let key = format!("{key_prefix}.norm_level");
814-
self.norm_level.visit(v, &key, "Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0.");
815-
}
777+
/// Maximum chunk size in bytes. A split is forced when the accumulated
778+
/// size exceeds this value. Default is 1 MiB.
779+
pub max_chunk_size: usize, default = 1024 * 1024
816780

817-
fn reset(&mut self, key: &str) -> Result<()> {
818-
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
819-
match key {
820-
"min_chunk_size" => {
821-
if rem.is_empty() {
822-
self.min_chunk_size = CdcOptions::default().min_chunk_size;
823-
Ok(())
824-
} else {
825-
self.min_chunk_size.reset(rem)
826-
}
827-
}
828-
"max_chunk_size" => {
829-
if rem.is_empty() {
830-
self.max_chunk_size = CdcOptions::default().max_chunk_size;
831-
Ok(())
832-
} else {
833-
self.max_chunk_size.reset(rem)
834-
}
835-
}
836-
"norm_level" => {
837-
if rem.is_empty() {
838-
self.norm_level = CdcOptions::default().norm_level;
839-
Ok(())
840-
} else {
841-
self.norm_level.reset(rem)
842-
}
843-
}
844-
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
845-
}
781+
/// Normalization level. Increasing this improves deduplication ratio
782+
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
783+
pub norm_level: i32, default = 0
846784
}
847785
}
848786

849-
/// `ConfigField` for `Option<CdcOptions>` — allows setting the option to
850-
/// `"true"` (enable with defaults) or `"false"` (disable), in addition to
851-
/// setting individual sub-fields like `min_chunk_size`.
852-
impl ConfigField for Option<CdcOptions> {
853-
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
854-
match self {
855-
Some(s) => s.visit(v, key, description),
856-
None => v.none(key, description),
857-
}
858-
}
859-
860-
fn set(&mut self, key: &str, value: &str) -> Result<()> {
861-
if key.is_empty() {
862-
match value.to_ascii_lowercase().as_str() {
863-
"true" => {
864-
*self = Some(CdcOptions::default());
865-
Ok(())
866-
}
867-
"false" => {
868-
*self = None;
869-
Ok(())
870-
}
871-
_ => _config_err!(
872-
"Expected 'true' or 'false' for use_content_defined_chunking, got '{value}'"
873-
),
874-
}
875-
} else {
876-
self.get_or_insert_with(CdcOptions::default).set(key, value)
787+
impl ParquetCdcOptions {
788+
/// Returns enabled CDC options with the default chunking parameters.
789+
///
790+
/// Shorthand for `ParquetCdcOptions { enabled: true, ..Default::default() }`;
791+
/// combine with struct-update syntax to override parameters, e.g.
792+
/// `ParquetCdcOptions { min_chunk_size: 4096, ..ParquetCdcOptions::enabled() }`.
793+
pub fn enabled() -> Self {
794+
Self {
795+
enabled: true,
796+
..Default::default()
877797
}
878798
}
879799

880-
fn reset(&mut self, key: &str) -> Result<()> {
881-
if key.is_empty() {
882-
*self = None;
883-
Ok(())
884-
} else {
885-
self.get_or_insert_with(CdcOptions::default).reset(key)
886-
}
800+
/// Returns disabled CDC options (equivalent to [`ParquetCdcOptions::default`]).
801+
pub fn disabled() -> Self {
802+
Self::default()
887803
}
888804
}
889805

@@ -1083,11 +999,14 @@ config_namespace! {
1083999
/// data frame.
10841000
pub maximum_buffered_record_batches_per_stream: usize, default = 2
10851001

1086-
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
1087-
/// parquet files. When `Some`, CDC is enabled with the given options; when `None`
1088-
/// (the default), CDC is disabled. When CDC is enabled, parallel writing is
1089-
/// automatically disabled since the chunker state must persist across row groups.
1090-
pub use_content_defined_chunking: Option<CdcOptions>, default = None
1002+
/// (writing) EXPERIMENTAL: Content-defined chunking (CDC) options when writing
1003+
/// parquet files. Disabled by default; toggle with
1004+
/// `content_defined_chunking.enabled = true|false`. The chunking parameters live
1005+
/// under the same prefix (e.g. `content_defined_chunking.min_chunk_size`). When
1006+
/// enabled, parallel writing is automatically disabled since the chunker state
1007+
/// must persist across row groups. Mirrors
1008+
/// `parquet::file::properties::WriterProperties::content_defined_chunking`.
1009+
pub content_defined_chunking: ParquetCdcOptions, default = Default::default()
10911010
}
10921011
}
10931012

@@ -4111,73 +4030,54 @@ mod tests {
41114030

41124031
#[cfg(feature = "parquet")]
41134032
#[test]
4114-
fn set_cdc_option_with_boolean_true() {
4033+
fn set_cdc_enabled_flag() {
41154034
use crate::config::ConfigOptions;
41164035

41174036
let mut config = ConfigOptions::default();
4118-
assert!(
4119-
config
4120-
.execution
4121-
.parquet
4122-
.use_content_defined_chunking
4123-
.is_none()
4124-
);
4037+
// CDC is disabled by default.
4038+
assert!(!config.execution.parquet.content_defined_chunking.enabled);
41254039

4126-
// Setting to "true" should enable CDC with default options
4040+
// `.enabled = true` enables CDC; parameters keep their defaults.
41274041
config
41284042
.set(
4129-
"datafusion.execution.parquet.use_content_defined_chunking",
4043+
"datafusion.execution.parquet.content_defined_chunking.enabled",
41304044
"true",
41314045
)
41324046
.unwrap();
4133-
let cdc = config
4134-
.execution
4135-
.parquet
4136-
.use_content_defined_chunking
4137-
.as_ref()
4138-
.expect("CDC should be enabled");
4047+
let cdc = &config.execution.parquet.content_defined_chunking;
4048+
assert!(cdc.enabled);
41394049
assert_eq!(cdc.min_chunk_size, 256 * 1024);
41404050
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
41414051
assert_eq!(cdc.norm_level, 0);
41424052

4143-
// Setting to "false" should disable CDC
4053+
// `.enabled = false` disables CDC.
41444054
config
41454055
.set(
4146-
"datafusion.execution.parquet.use_content_defined_chunking",
4056+
"datafusion.execution.parquet.content_defined_chunking.enabled",
41474057
"false",
41484058
)
41494059
.unwrap();
4150-
assert!(
4151-
config
4152-
.execution
4153-
.parquet
4154-
.use_content_defined_chunking
4155-
.is_none()
4156-
);
4060+
assert!(!config.execution.parquet.content_defined_chunking.enabled);
41574061
}
41584062

41594063
#[cfg(feature = "parquet")]
41604064
#[test]
4161-
fn set_cdc_option_with_subfields() {
4065+
fn set_cdc_param_does_not_enable() {
41624066
use crate::config::ConfigOptions;
41634067

41644068
let mut config = ConfigOptions::default();
41654069

4166-
// Setting sub-fields should also enable CDC
4070+
// Setting a parameter does NOT enable CDC (`enabled` is a distinct field,
4071+
// defaulting to false), and the result is independent of key order.
41674072
config
41684073
.set(
4169-
"datafusion.execution.parquet.use_content_defined_chunking.min_chunk_size",
4074+
"datafusion.execution.parquet.content_defined_chunking.min_chunk_size",
41704075
"1024",
41714076
)
41724077
.unwrap();
4173-
let cdc = config
4174-
.execution
4175-
.parquet
4176-
.use_content_defined_chunking
4177-
.as_ref()
4178-
.expect("CDC should be enabled");
4078+
let cdc = &config.execution.parquet.content_defined_chunking;
4079+
assert!(!cdc.enabled);
41794080
assert_eq!(cdc.min_chunk_size, 1024);
4180-
// Other fields should be defaults
41814081
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
41824082
assert_eq!(cdc.norm_level, 0);
41834083
}

0 commit comments

Comments
 (0)