Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 209 additions & 82 deletions datafusion/proto/src/logical_plan/file_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::sync::Arc;

use super::LogicalExtensionCodec;
use crate::convert::FromProto;
use crate::convert::{FromProto, TryFromProto};
use crate::protobuf::{
CsvOptions as CsvOptionsProto, CsvQuoteStyle as CsvQuoteStyleProto,
JsonOptions as JsonOptionsProto,
Expand Down Expand Up @@ -500,81 +500,145 @@ mod parquet {
}
}

impl FromProto<&ParquetOptionsProto> for ParquetOptions {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dunno how much we care about maintaining compatibility here; I guess we could keep FromProto impl and just defer to the try version with an unwrap 🤔

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this on TryFromProto only rather than reintroducing an infallible FromProto wrapper. I checked the local call sites and there aren’t any remaining users of the old parquet FromProto impl, and keeping an unwrap-based wrapper would preserve the panic path this PR is trying to remove.

For the empty writer_version case, I kept the defaulting because this is specifically the proto3 omitted-field case: absent strings decode as "", while the schema documents the logical default for writer_version as "1.0". So empty/omitted preserves compatibility, while non-empty invalid values still return an error.

fn from_proto(proto: &ParquetOptionsProto) -> Self {
ParquetOptions {
enable_page_index: proto.enable_page_index,
pruning: proto.pruning,
skip_metadata: proto.skip_metadata,
metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
}),
pushdown_filters: proto.pushdown_filters,
reorder_filters: proto.reorder_filters,
force_filter_selections: proto.force_filter_selections,
data_pagesize_limit: proto.data_pagesize_limit as usize,
write_batch_size: proto.write_batch_size as usize,
// TODO: Consider changing to TryFrom to avoid panic on invalid proto data
writer_version: proto.writer_version.parse().expect("
Invalid parquet writer version in proto, expected '1.0' or '2.0'
"),
compression: proto.compression_opt.as_ref().map(|opt| match opt {
parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
}),
dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
}),
dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
}),
max_row_group_size: proto.max_row_group_size as usize,
created_by: proto.created_by.clone(),
column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
}),
statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
}),
data_page_row_count_limit: proto.data_page_row_count_limit as usize,
encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
}),
bloom_filter_on_read: proto.bloom_filter_on_read,
bloom_filter_on_write: proto.bloom_filter_on_write,
bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
}),
bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
}),
allow_single_file_parallelism: proto.allow_single_file_parallelism,
maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
schema_force_view_types: proto.schema_force_view_types,
binary_as_string: proto.binary_as_string,
skip_arrow_metadata: proto.skip_arrow_metadata,
coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
}),
coerce_int96_tz: proto.coerce_int96_tz_opt.as_ref().map(|opt| match opt {
parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(tz) => tz.clone(),
}),
max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt {
parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize,
}),
use_content_defined_chunking: proto.content_defined_chunking.map(|cdc| {
let defaults = CdcOptions::default();
CdcOptions {
// proto3 uses 0 as the wire default for uint64; a zero chunk size is
// invalid, so treat it as "field not set" and fall back to the default.
min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size },
max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size },
// norm_level = 0 is a valid value (and the default), so pass it through directly.
norm_level: cdc.norm_level,
}
}),
}
impl TryFromProto<&ParquetOptionsProto> for ParquetOptions {
type Error = datafusion_common::DataFusionError;

fn try_from_proto(
proto: &ParquetOptionsProto,
) -> datafusion_common::Result<Self, Self::Error> {
let writer_version = match proto.writer_version.as_str() {
// Proto3 decodes an omitted string field as the empty string. The
// schema documents writer_version's logical default as "1.0", so
// preserve that default when the field is absent on the wire.
"" => ParquetOptions::default().writer_version,
version => version.parse()?,
};

Ok(ParquetOptions {
enable_page_index: proto.enable_page_index,
pruning: proto.pruning,
skip_metadata: proto.skip_metadata,
metadata_size_hint: proto
.metadata_size_hint_opt
.as_ref()
.map(|opt| match opt {
parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => {
*size as usize
}
}),
pushdown_filters: proto.pushdown_filters,
reorder_filters: proto.reorder_filters,
force_filter_selections: proto.force_filter_selections,
data_pagesize_limit: proto.data_pagesize_limit as usize,
write_batch_size: proto.write_batch_size as usize,
writer_version,
compression: proto.compression_opt.as_ref().map(|opt| match opt {
parquet_options::CompressionOpt::Compression(compression) => {
compression.clone()
}
}),
dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| {
match opt {
parquet_options::DictionaryEnabledOpt::DictionaryEnabled(
enabled,
) => *enabled,
}
}),
dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
statistics_enabled: proto.statistics_enabled_opt.as_ref().map(
|opt| match opt {
parquet_options::StatisticsEnabledOpt::StatisticsEnabled(
statistics,
) => statistics.clone(),
},
),
max_row_group_size: proto.max_row_group_size as usize,
created_by: proto.created_by.clone(),
column_index_truncate_length: proto
.column_index_truncate_length_opt
.as_ref()
.map(|opt| match opt {
parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
}),
statistics_truncate_length: proto
.statistics_truncate_length_opt
.as_ref()
.map(|opt| match opt {
parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
}),
data_page_row_count_limit: proto.data_page_row_count_limit as usize,
encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
parquet_options::EncodingOpt::Encoding(encoding) => {
encoding.clone()
}
}),
bloom_filter_on_read: proto.bloom_filter_on_read,
bloom_filter_on_write: proto.bloom_filter_on_write,
bloom_filter_fpp: proto
.bloom_filter_fpp_opt
.as_ref()
.map(|opt| match opt {
parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
}),
bloom_filter_ndv: proto
.bloom_filter_ndv_opt
.as_ref()
.map(|opt| match opt {
parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
}),
allow_single_file_parallelism: proto.allow_single_file_parallelism,
maximum_parallel_row_group_writers: proto
.maximum_parallel_row_group_writers
as usize,
maximum_buffered_record_batches_per_stream: proto
.maximum_buffered_record_batches_per_stream
as usize,
schema_force_view_types: proto.schema_force_view_types,
binary_as_string: proto.binary_as_string,
skip_arrow_metadata: proto.skip_arrow_metadata,
coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => {
coerce_int96.clone()
}
}),
coerce_int96_tz: proto
.coerce_int96_tz_opt
.as_ref()
.map(|opt| match opt {
parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(tz) => {
tz.clone()
}
}),
max_predicate_cache_size: proto
.max_predicate_cache_size_opt
.as_ref()
.map(|opt| match opt {
parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(
size,
) => *size as usize,
}),
use_content_defined_chunking: proto.content_defined_chunking.map(
|cdc| {
let defaults = CdcOptions::default();
CdcOptions {
// proto3 uses 0 as the wire default for uint64; a zero chunk size is
// invalid, so treat it as "field not set" and fall back to the default.
min_chunk_size: if cdc.min_chunk_size != 0 {
cdc.min_chunk_size as usize
} else {
defaults.min_chunk_size
},
max_chunk_size: if cdc.max_chunk_size != 0 {
cdc.max_chunk_size as usize
} else {
defaults.max_chunk_size
},
// norm_level = 0 is a valid value (and the default), so pass it through directly.
norm_level: cdc.norm_level,
}
},
),
})
}
}

Expand Down Expand Up @@ -606,13 +670,18 @@ mod parquet {
}
}

impl FromProto<&TableParquetOptionsProto> for TableParquetOptions {
fn from_proto(proto: &TableParquetOptionsProto) -> Self {
TableParquetOptions {
impl TryFromProto<&TableParquetOptionsProto> for TableParquetOptions {
type Error = datafusion_common::DataFusionError;

fn try_from_proto(
proto: &TableParquetOptionsProto,
) -> datafusion_common::Result<Self, Self::Error> {
Ok(TableParquetOptions {
global: proto
.global
.as_ref()
.map(ParquetOptions::from_proto)
.map(ParquetOptions::try_from_proto)
.transpose()?
.unwrap_or_default(),
column_specific_options: proto
.column_specific_options
Expand All @@ -635,7 +704,7 @@ mod parquet {
.map(|(k, v)| (k.clone(), Some(v.clone())))
.collect(),
..Default::default()
}
})
}
}

Expand Down Expand Up @@ -689,7 +758,7 @@ mod parquet {
let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
exec_datafusion_err!("Failed to decode TableParquetOptionsProto: {e:?}")
})?;
let options = TableParquetOptions::from_proto(&proto);
let options = TableParquetOptions::try_from_proto(&proto)?;
Ok(Arc::new(
datafusion_datasource_parquet::file_format::ParquetFormatFactory {
options: Some(options),
Expand Down Expand Up @@ -723,6 +792,64 @@ mod parquet {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

fn encode_table_options(proto: TableParquetOptionsProto) -> Vec<u8> {
let mut buf = Vec::new();
proto.encode(&mut buf).expect("encode parquet options");
buf
}

#[test]
fn try_decode_file_format_errors_on_invalid_writer_version() {
let proto = TableParquetOptionsProto {
global: Some(ParquetOptionsProto {
writer_version: "3.0".to_string(),
..Default::default()
}),
..Default::default()
};

let result = ParquetLogicalExtensionCodec.try_decode_file_format(
&encode_table_options(proto),
&TaskContext::default(),
);

let err = result.expect_err("invalid writer version should error");
assert!(
err.to_string()
.contains("Invalid parquet writer version: 3.0"),
"{err}"
);
}

#[test]
fn try_decode_file_format_defaults_empty_writer_version() {
let proto = TableParquetOptionsProto {
global: Some(ParquetOptionsProto::default()),
..Default::default()
};

let factory = ParquetLogicalExtensionCodec
.try_decode_file_format(
&encode_table_options(proto),
&TaskContext::default(),
)
.expect("decode parquet options");
let parquet_factory = factory
.downcast_ref::<ParquetFormatFactory>()
.expect("parquet format factory");
let options = parquet_factory.options.as_ref().expect("parquet options");

assert_eq!(
options.global.writer_version,
ParquetOptions::default().writer_version
);
}
}
}
#[cfg(feature = "parquet")]
pub use parquet::ParquetLogicalExtensionCodec;
Expand Down