Skip to content

Commit 84bc876

Browse files
Satyr092010YOUY01
andauthored
feat: add max_row_group_bytes option to ParquetOptions (apache#22649)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#22650. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> arrow-rs 58.0 added WriterProperties::set_max_row_group_bytes (PR: apache/arrow-rs#9357 Issue: apache/arrow-rs#1213), which flushes a row group when either the row-count or the byte limit is reached, whichever comes first, matching parquet-mr's parquet.block.size. DataFusion already consumes atleast this version of arrow but does not yet expose this new byte-based setter through its config. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Add `max_row_group_bytes: Option<usize>` (default None) to ParquetOptions in `datafusion/common/src/config.rs`. - Wire it through `ParquetOptions::into_writer_properties_builder` to `WriterPropertiesBuilder::set_max_row_group_bytes`, with a guard that rejects Some(0) as a configuration error (arrow-rs panics on a zero byte limit). - Plumb the field through protobuf serialization - add it to the ParquetOptions proto message and the proto-common/proto conversions, with regenerated bindings. - Exposed as the max_row_group_bytes COPY / CREATE EXTERNAL TABLE format option alongside max_row_group_size. - Update the generated config docs and the format options table doc. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes - run locally and passing: Unit (datafusion-common, parquet_writer.rs): - defaults to None, so no byte limit is propagated to WriterProperties. - a configured value propagates to WriterProperties. - Some(0) is rejected with a configuration error. - the existing table_parquet_opts_to_writer_props round-trip and test_defaults_match tests were extended to cover the new field. Protobuf round-trip (datafusion-proto-common): - new test_parquet_options_max_row_group_bytes_round_trip confirms the option survives serialization to protobuf and back. SLTs: - new test_files/parquet_max_row_group_bytes.slt writes Parquet with the option set (via both COPY ... OPTIONS and session config), reads it back, asserts the data round-trips, and asserts a zero value is rejected. - copy.slt exercises the option inside the existing "all supported statement overrides" COPY test. - information_schema.slt updated for the new option in SHOW ALL. Commands run locally (all pass): cargo test -p datafusion-common --features parquet cargo test -p datafusion-proto-common cargo test -p datafusion-proto cargo test --test sqllogictests -- parquet_max_row_group_bytes cargo test --test sqllogictests -- information_schema cargo test --test sqllogictests -- copy ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> Additive only, does not affect existing options. <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Yongting You <2010youy01@gmail.com>
1 parent 2ec0ab5 commit 84bc876

14 files changed

Lines changed: 427 additions & 8 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,73 @@ impl ParquetCdcOptions {
803803
}
804804
}
805805

806+
/// Target maximum size of a Parquet row group in bytes.
807+
///
808+
/// Wraps a `usize` so the "must be greater than zero" constraint (arrow-rs
809+
/// panics on a zero byte limit) is validated when the config is set, rather
810+
/// than when the writer properties are built.
811+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
812+
pub struct MaxRowGroupBytes(usize);
813+
814+
impl MaxRowGroupBytes {
815+
/// Creates a `MaxRowGroupBytes`, rejecting zero.
816+
pub fn try_new(value: usize) -> Result<Self> {
817+
if value == 0 {
818+
return Err(DataFusionError::Configuration(
819+
"max_row_group_bytes must be greater than 0".to_string(),
820+
));
821+
}
822+
Ok(Self(value))
823+
}
824+
825+
/// Returns the configured byte limit.
826+
pub fn get(&self) -> usize {
827+
self.0
828+
}
829+
}
830+
831+
impl FromStr for MaxRowGroupBytes {
832+
type Err = DataFusionError;
833+
834+
fn from_str(s: &str) -> Result<Self, Self::Err> {
835+
let value = s.parse::<usize>().map_err(|_| {
836+
DataFusionError::Configuration(format!(
837+
"Invalid max_row_group_bytes: '{s}'. Expected a positive integer."
838+
))
839+
})?;
840+
Self::try_new(value)
841+
}
842+
}
843+
844+
impl Display for MaxRowGroupBytes {
845+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
846+
write!(f, "{}", self.0)
847+
}
848+
}
849+
850+
/// `ConfigField` for `Option<MaxRowGroupBytes>`. A custom impl (rather than the
851+
/// blanket `Option<F>` one) so an invalid value is rejected without leaving the
852+
/// option in an invalid intermediate state on error. `MaxRowGroupBytes`
853+
/// deliberately does not implement `Default`, so the blanket impl does not apply.
854+
impl ConfigField for Option<MaxRowGroupBytes> {
855+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
856+
match self {
857+
Some(s) => v.some(key, s, description),
858+
None => v.none(key, description),
859+
}
860+
}
861+
862+
fn set(&mut self, _key: &str, value: &str) -> Result<()> {
863+
*self = Some(MaxRowGroupBytes::from_str(value)?);
864+
Ok(())
865+
}
866+
867+
fn reset(&mut self, _key: &str) -> Result<()> {
868+
*self = None;
869+
Ok(())
870+
}
871+
}
872+
806873
config_namespace! {
807874
/// Options for reading and writing parquet files
808875
///
@@ -936,9 +1003,21 @@ config_namespace! {
9361003

9371004
/// (writing) Target maximum number of rows in each row group (defaults to 1M
9381005
/// rows). Writing larger row groups requires more memory to write, but
939-
/// can get better compression and be faster to read.
1006+
/// can get better compression and be faster to read. When
1007+
/// `max_row_group_bytes` is also set, the writer flushes a row group when
1008+
/// either limit is reached, whichever comes first.
9401009
pub max_row_group_size: usize, default = 1024 * 1024
9411010

1011+
/// (writing) Target maximum size of each row group in bytes. When set,
1012+
/// the writer flushes whenever either this limit or `max_row_group_size`
1013+
/// is reached, whichever comes first. Useful for bounding writer memory
1014+
/// on wide schemas where a row-count limit can map to very different
1015+
/// byte sizes. Matches the behavior of `parquet.block.size` in
1016+
/// parquet-mr. If `None` (the default), only the row-count limit
1017+
/// applies. Currently only honored when `allow_single_file_parallelism`
1018+
/// is `false`; by default the parallel file writer ignores this limit.
1019+
pub max_row_group_bytes: Option<MaxRowGroupBytes>, default = None
1020+
9421021
/// (writing) Sets "created by" property
9431022
pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
9441023

@@ -4081,4 +4160,49 @@ mod tests {
40814160
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
40824161
assert_eq!(cdc.norm_level, 0);
40834162
}
4163+
4164+
#[test]
4165+
fn max_row_group_bytes_rejects_zero() {
4166+
use crate::config::MaxRowGroupBytes;
4167+
use std::str::FromStr;
4168+
4169+
assert!(MaxRowGroupBytes::try_new(0).is_err());
4170+
assert!(MaxRowGroupBytes::from_str("0").is_err());
4171+
assert!(MaxRowGroupBytes::from_str("not_a_number").is_err());
4172+
assert_eq!(MaxRowGroupBytes::try_new(128).unwrap().get(), 128);
4173+
assert_eq!(MaxRowGroupBytes::from_str("128").unwrap().get(), 128);
4174+
}
4175+
4176+
#[test]
4177+
fn parquet_max_row_group_bytes_config_set_rejects_zero() {
4178+
use crate::config::ConfigOptions;
4179+
4180+
let mut options = ConfigOptions::new();
4181+
options
4182+
.set("datafusion.execution.parquet.max_row_group_bytes", "1024")
4183+
.unwrap();
4184+
assert_eq!(
4185+
options
4186+
.execution
4187+
.parquet
4188+
.max_row_group_bytes
4189+
.map(|v| v.get()),
4190+
Some(1024)
4191+
);
4192+
4193+
// Zero is rejected at set time, leaving the previous value unchanged.
4194+
assert!(
4195+
options
4196+
.set("datafusion.execution.parquet.max_row_group_bytes", "0")
4197+
.is_err()
4198+
);
4199+
assert_eq!(
4200+
options
4201+
.execution
4202+
.parquet
4203+
.max_row_group_bytes
4204+
.map(|v| v.get()),
4205+
Some(1024)
4206+
);
4207+
}
40844208
}

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ impl ParquetOptions {
219219
dictionary_page_size_limit,
220220
statistics_enabled,
221221
max_row_group_size,
222+
max_row_group_bytes,
222223
created_by,
223224
column_index_truncate_length,
224225
statistics_truncate_length,
@@ -261,6 +262,7 @@ impl ParquetOptions {
261262
.unwrap_or(DEFAULT_STATISTICS_ENABLED),
262263
)
263264
.set_max_row_group_row_count(Some(*max_row_group_size))
265+
.set_max_row_group_bytes(max_row_group_bytes.as_ref().map(|v| v.get()))
264266
.set_created_by(created_by.clone())
265267
.set_column_index_truncate_length(*column_index_truncate_length)
266268
.set_statistics_truncate_length(*statistics_truncate_length)
@@ -428,7 +430,8 @@ mod tests {
428430
#[cfg(feature = "parquet_encryption")]
429431
use crate::config::ConfigFileEncryptionProperties;
430432
use crate::config::{
431-
ParquetCdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions,
433+
MaxRowGroupBytes, ParquetCdcOptions, ParquetColumnOptions,
434+
ParquetEncryptionOptions, ParquetOptions,
432435
};
433436
use crate::parquet_config::DFParquetWriterVersion;
434437
use parquet::basic::Compression;
@@ -473,6 +476,7 @@ mod tests {
473476
dictionary_page_size_limit: 42,
474477
statistics_enabled: Some("chunk".into()),
475478
max_row_group_size: 42,
479+
max_row_group_bytes: Some(MaxRowGroupBytes::try_new(42).unwrap()),
476480
created_by: "wordy".into(),
477481
column_index_truncate_length: Some(42),
478482
statistics_truncate_length: Some(42),
@@ -582,6 +586,9 @@ mod tests {
582586
max_row_group_size: props
583587
.max_row_group_row_count()
584588
.unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT),
589+
max_row_group_bytes: props
590+
.max_row_group_bytes()
591+
.and_then(|v| MaxRowGroupBytes::try_new(v).ok()),
585592
created_by: props.created_by().to_string(),
586593
column_index_truncate_length: props.column_index_truncate_length(),
587594
statistics_truncate_length: props.statistics_truncate_length(),
@@ -895,6 +902,26 @@ mod tests {
895902
assert_eq!(cdc.norm_level, -1);
896903
}
897904

905+
#[test]
906+
fn test_max_row_group_bytes_disabled_by_default() {
907+
let mut opts = TableParquetOptions::default();
908+
opts.arrow_schema(&Arc::new(Schema::empty()));
909+
910+
let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
911+
assert_eq!(props.max_row_group_bytes(), None);
912+
}
913+
914+
#[test]
915+
fn test_max_row_group_bytes_propagated_to_writer_props() {
916+
let mut opts = TableParquetOptions::default();
917+
opts.global.max_row_group_bytes =
918+
Some(MaxRowGroupBytes::try_new(64 * 1024 * 1024).unwrap());
919+
opts.arrow_schema(&Arc::new(Schema::empty()));
920+
921+
let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
922+
assert_eq!(props.max_row_group_bytes(), Some(64 * 1024 * 1024));
923+
}
924+
898925
#[test]
899926
fn test_bloom_filter_set_ndv_only() {
900927
// the TableParquetOptions::default, with only ndv set

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,10 @@ message ParquetOptions {
627627
uint64 max_predicate_cache_size = 33;
628628
}
629629

630+
oneof max_row_group_bytes_opt {
631+
uint64 max_row_group_bytes = 37;
632+
}
633+
630634
ParquetCdcOptions content_defined_chunking = 35;
631635

632636
// Optional timezone applied to INT96-coerced timestamps when `coerce_int96`

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ use datafusion_common::{
3939
DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
4040
arrow_datafusion_err,
4141
config::{
42-
CsvOptions, JsonOptions, ParquetCdcOptions, ParquetColumnOptions, ParquetOptions,
43-
TableParquetOptions,
42+
CsvOptions, JsonOptions, MaxRowGroupBytes, ParquetCdcOptions,
43+
ParquetColumnOptions, ParquetOptions, TableParquetOptions,
4444
},
4545
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
4646
parsers::CompressionTypeVariant,
@@ -1130,6 +1130,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
11301130
max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
11311131
protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
11321132
}).unwrap_or(None),
1133+
max_row_group_bytes: value.max_row_group_bytes_opt.and_then(|opt| match opt {
1134+
protobuf::parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v) => MaxRowGroupBytes::try_new(v as usize).ok(),
1135+
}),
11331136
content_defined_chunking: value.content_defined_chunking.map(ParquetCdcOptions::from).unwrap_or_default(),
11341137
})
11351138
}
@@ -1331,7 +1334,7 @@ pub(crate) fn csv_writer_options_from_proto(
13311334
#[cfg(test)]
13321335
mod tests {
13331336
use datafusion_common::config::{
1334-
ParquetCdcOptions, ParquetOptions, TableParquetOptions,
1337+
MaxRowGroupBytes, ParquetCdcOptions, ParquetOptions, TableParquetOptions,
13351338
};
13361339

13371340
fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions {
@@ -1376,6 +1379,21 @@ mod tests {
13761379
assert_eq!(recovered.coerce_int96_tz, Some("UTC".to_string()));
13771380
}
13781381

1382+
#[test]
1383+
fn test_parquet_options_max_row_group_bytes_round_trip() {
1384+
let opts = ParquetOptions {
1385+
max_row_group_bytes: Some(
1386+
MaxRowGroupBytes::try_new(64 * 1024 * 1024).unwrap(),
1387+
),
1388+
..ParquetOptions::default()
1389+
};
1390+
let recovered = parquet_options_proto_round_trip(opts.clone());
1391+
assert_eq!(
1392+
recovered.max_row_group_bytes.map(|v| v.get()),
1393+
Some(64 * 1024 * 1024)
1394+
);
1395+
}
1396+
13791397
#[test]
13801398
fn test_table_parquet_options_coerce_int96_tz_round_trip() {
13811399
let mut opts = TableParquetOptions::default();

datafusion/proto-common/src/generated/pbjson.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6448,6 +6448,9 @@ impl serde::Serialize for ParquetOptions {
64486448
if self.max_predicate_cache_size_opt.is_some() {
64496449
len += 1;
64506450
}
6451+
if self.max_row_group_bytes_opt.is_some() {
6452+
len += 1;
6453+
}
64516454
if self.coerce_int96_tz_opt.is_some() {
64526455
len += 1;
64536456
}
@@ -6619,6 +6622,15 @@ impl serde::Serialize for ParquetOptions {
66196622
}
66206623
}
66216624
}
6625+
if let Some(v) = self.max_row_group_bytes_opt.as_ref() {
6626+
match v {
6627+
parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v) => {
6628+
#[allow(clippy::needless_borrow)]
6629+
#[allow(clippy::needless_borrows_for_generic_args)]
6630+
struct_ser.serialize_field("maxRowGroupBytes", ToString::to_string(&v).as_str())?;
6631+
}
6632+
}
6633+
}
66226634
if let Some(v) = self.coerce_int96_tz_opt.as_ref() {
66236635
match v {
66246636
parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(v) => {
@@ -6699,6 +6711,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
66996711
"coerceInt96",
67006712
"max_predicate_cache_size",
67016713
"maxPredicateCacheSize",
6714+
"max_row_group_bytes",
6715+
"maxRowGroupBytes",
67026716
"coerce_int96_tz",
67036717
"coerceInt96Tz",
67046718
];
@@ -6738,6 +6752,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
67386752
BloomFilterNdv,
67396753
CoerceInt96,
67406754
MaxPredicateCacheSize,
6755+
MaxRowGroupBytes,
67416756
CoerceInt96Tz,
67426757
}
67436758
impl<'de> serde::Deserialize<'de> for GeneratedField {
@@ -6793,6 +6808,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
67936808
"bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv),
67946809
"coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96),
67956810
"maxPredicateCacheSize" | "max_predicate_cache_size" => Ok(GeneratedField::MaxPredicateCacheSize),
6811+
"maxRowGroupBytes" | "max_row_group_bytes" => Ok(GeneratedField::MaxRowGroupBytes),
67966812
"coerceInt96Tz" | "coerce_int96_tz" => Ok(GeneratedField::CoerceInt96Tz),
67976813
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
67986814
}
@@ -6846,6 +6862,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
68466862
let mut bloom_filter_ndv_opt__ = None;
68476863
let mut coerce_int96_opt__ = None;
68486864
let mut max_predicate_cache_size_opt__ = None;
6865+
let mut max_row_group_bytes_opt__ = None;
68496866
let mut coerce_int96_tz_opt__ = None;
68506867
while let Some(k) = map_.next_key()? {
68516868
match k {
@@ -7061,6 +7078,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
70617078
}
70627079
max_predicate_cache_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(x.0));
70637080
}
7081+
GeneratedField::MaxRowGroupBytes => {
7082+
if max_row_group_bytes_opt__.is_some() {
7083+
return Err(serde::de::Error::duplicate_field("maxRowGroupBytes"));
7084+
}
7085+
max_row_group_bytes_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(x.0));
7086+
}
70647087
GeneratedField::CoerceInt96Tz => {
70657088
if coerce_int96_tz_opt__.is_some() {
70667089
return Err(serde::de::Error::duplicate_field("coerceInt96Tz"));
@@ -7103,6 +7126,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
71037126
bloom_filter_ndv_opt: bloom_filter_ndv_opt__,
71047127
coerce_int96_opt: coerce_int96_opt__,
71057128
max_predicate_cache_size_opt: max_predicate_cache_size_opt__,
7129+
max_row_group_bytes_opt: max_row_group_bytes_opt__,
71067130
coerce_int96_tz_opt: coerce_int96_tz_opt__,
71077131
})
71087132
}

datafusion/proto-common/src/generated/prost.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -900,6 +900,10 @@ pub struct ParquetOptions {
900900
pub max_predicate_cache_size_opt: ::core::option::Option<
901901
parquet_options::MaxPredicateCacheSizeOpt,
902902
>,
903+
#[prost(oneof = "parquet_options::MaxRowGroupBytesOpt", tags = "37")]
904+
pub max_row_group_bytes_opt: ::core::option::Option<
905+
parquet_options::MaxRowGroupBytesOpt,
906+
>,
903907
/// Optional timezone applied to INT96-coerced timestamps when `coerce_int96`
904908
/// is set. When `Some`, INT96 columns coerce to
905909
/// `Timestamp(<coerce_int96>, Some(<tz>))` instead of the default
@@ -964,6 +968,11 @@ pub mod parquet_options {
964968
#[prost(uint64, tag = "33")]
965969
MaxPredicateCacheSize(u64),
966970
}
971+
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)]
972+
pub enum MaxRowGroupBytesOpt {
973+
#[prost(uint64, tag = "37")]
974+
MaxRowGroupBytes(u64),
975+
}
967976
/// Optional timezone applied to INT96-coerced timestamps when `coerce_int96`
968977
/// is set. When `Some`, INT96 columns coerce to
969978
/// `Timestamp(<coerce_int96>, Some(<tz>))` instead of the default

datafusion/proto-common/src/to_proto/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,6 +938,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
938938
coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
939939
coerce_int96_tz_opt: value.coerce_int96_tz.clone().map(protobuf::parquet_options::CoerceInt96TzOpt::CoerceInt96Tz),
940940
max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)),
941+
max_row_group_bytes_opt: value.max_row_group_bytes.map(|v| protobuf::parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v.get() as u64)),
941942
content_defined_chunking: Some((&value.content_defined_chunking).into()),
942943
})
943944
}

0 commit comments

Comments
 (0)