Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ const INITIAL_BUFFER_BYTES: usize = 1048576;
/// this size, it is flushed to object store
const BUFFER_FLUSH_BYTES: usize = 1024000;

/// CDC requires the sequential writer path because chunker state is maintained
/// inside `ArrowWriter` across row groups
fn should_use_sequential_writer(parquet_opts: &TableParquetOptions) -> bool {
!parquet_opts.global.allow_single_file_parallelism
|| parquet_opts.global.use_content_defined_chunking.is_some()
}

#[derive(Default)]
/// Factory struct used to create [ParquetFormat]
pub struct ParquetFormatFactory {
Expand Down Expand Up @@ -1370,11 +1377,7 @@ impl FileSink for ParquetSink {

while let Some((path, mut rx)) = file_stream_rx.recv().await {
let parquet_props = self.create_writer_props(&runtime, &path).await?;
// CDC requires the sequential writer: the chunker state lives in ArrowWriter
// and persists across row groups. The parallel path bypasses ArrowWriter entirely.
if !parquet_opts.global.allow_single_file_parallelism
|| parquet_opts.global.use_content_defined_chunking.is_some()
{
if should_use_sequential_writer(parquet_opts) {
let mut writer = self
.create_async_arrow_writer(
&path,
Expand Down Expand Up @@ -1816,6 +1819,7 @@ mod tests {
use parquet::arrow::parquet_to_arrow_schema;

use super::*;
use datafusion_common::config::CdcOptions;

use parquet::schema::parser::parse_message_type;

Expand Down Expand Up @@ -2025,4 +2029,30 @@ mod tests {

assert_eq!(result, expected_schema);
}

#[test]
fn sequential_when_parallel_off() {
let mut opts = TableParquetOptions::default();
opts.global.allow_single_file_parallelism = false;

assert!(should_use_sequential_writer(&opts));
}

#[test]
fn sequential_when_cdc_on() {
let mut opts = TableParquetOptions::default();
opts.global.allow_single_file_parallelism = true;
opts.global.use_content_defined_chunking = Some(CdcOptions::default());

assert!(should_use_sequential_writer(&opts));
}

#[test]
fn parallel_when_allowed_and_cdc_off() {
let mut opts = TableParquetOptions::default();
opts.global.allow_single_file_parallelism = true;
opts.global.use_content_defined_chunking = None;

assert!(!should_use_sequential_writer(&opts));
}
}
Loading