Skip to content

Commit 60b6435

Browse files
committed
fix parquet proto writer version decode
1 parent 4a41173 commit 60b6435

1 file changed

Lines changed: 208 additions & 82 deletions

File tree

datafusion/proto/src/logical_plan/file_formats.rs

Lines changed: 208 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::sync::Arc;
1919

2020
use super::LogicalExtensionCodec;
21-
use crate::convert::FromProto;
21+
use crate::convert::{FromProto, TryFromProto};
2222
use crate::protobuf::{
2323
CsvOptions as CsvOptionsProto, CsvQuoteStyle as CsvQuoteStyleProto,
2424
JsonOptions as JsonOptionsProto,
@@ -500,81 +500,144 @@ mod parquet {
500500
}
501501
}
502502

503-
impl FromProto<&ParquetOptionsProto> for ParquetOptions {
504-
fn from_proto(proto: &ParquetOptionsProto) -> Self {
505-
ParquetOptions {
506-
enable_page_index: proto.enable_page_index,
507-
pruning: proto.pruning,
508-
skip_metadata: proto.skip_metadata,
509-
metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
510-
parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
511-
}),
512-
pushdown_filters: proto.pushdown_filters,
513-
reorder_filters: proto.reorder_filters,
514-
force_filter_selections: proto.force_filter_selections,
515-
data_pagesize_limit: proto.data_pagesize_limit as usize,
516-
write_batch_size: proto.write_batch_size as usize,
517-
// TODO: Consider changing to TryFrom to avoid panic on invalid proto data
518-
writer_version: proto.writer_version.parse().expect("
519-
Invalid parquet writer version in proto, expected '1.0' or '2.0'
520-
"),
521-
compression: proto.compression_opt.as_ref().map(|opt| match opt {
522-
parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
523-
}),
524-
dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt {
525-
parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled,
526-
}),
527-
dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
528-
statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt {
529-
parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(),
530-
}),
531-
max_row_group_size: proto.max_row_group_size as usize,
532-
created_by: proto.created_by.clone(),
533-
column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
534-
parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
535-
}),
536-
statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
537-
parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
538-
}),
539-
data_page_row_count_limit: proto.data_page_row_count_limit as usize,
540-
encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
541-
parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
542-
}),
543-
bloom_filter_on_read: proto.bloom_filter_on_read,
544-
bloom_filter_on_write: proto.bloom_filter_on_write,
545-
bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt {
546-
parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
547-
}),
548-
bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt {
549-
parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
550-
}),
551-
allow_single_file_parallelism: proto.allow_single_file_parallelism,
552-
maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize,
553-
maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize,
554-
schema_force_view_types: proto.schema_force_view_types,
555-
binary_as_string: proto.binary_as_string,
556-
skip_arrow_metadata: proto.skip_arrow_metadata,
557-
coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
558-
parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
559-
}),
560-
coerce_int96_tz: proto.coerce_int96_tz_opt.as_ref().map(|opt| match opt {
561-
parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(tz) => tz.clone(),
562-
}),
563-
max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt {
564-
parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize,
565-
}),
566-
use_content_defined_chunking: proto.content_defined_chunking.map(|cdc| {
567-
let defaults = CdcOptions::default();
568-
CdcOptions {
569-
// proto3 uses 0 as the wire default for uint64; a zero chunk size is
570-
// invalid, so treat it as "field not set" and fall back to the default.
571-
min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size },
572-
max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size },
573-
// norm_level = 0 is a valid value (and the default), so pass it through directly.
574-
norm_level: cdc.norm_level,
575-
}
576-
}),
577-
}
503+
impl TryFromProto<&ParquetOptionsProto> for ParquetOptions {
504+
type Error = datafusion_common::DataFusionError;
505+
506+
fn try_from_proto(
507+
proto: &ParquetOptionsProto,
508+
) -> datafusion_common::Result<Self, Self::Error> {
509+
let default_options = ParquetOptions::default();
510+
let writer_version = if proto.writer_version.is_empty() {
511+
default_options.writer_version
512+
} else {
513+
proto.writer_version.parse()?
514+
};
515+
516+
Ok(ParquetOptions {
517+
enable_page_index: proto.enable_page_index,
518+
pruning: proto.pruning,
519+
skip_metadata: proto.skip_metadata,
520+
metadata_size_hint: proto
521+
.metadata_size_hint_opt
522+
.as_ref()
523+
.map(|opt| match opt {
524+
parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => {
525+
*size as usize
526+
}
527+
}),
528+
pushdown_filters: proto.pushdown_filters,
529+
reorder_filters: proto.reorder_filters,
530+
force_filter_selections: proto.force_filter_selections,
531+
data_pagesize_limit: proto.data_pagesize_limit as usize,
532+
write_batch_size: proto.write_batch_size as usize,
533+
writer_version,
534+
compression: proto.compression_opt.as_ref().map(|opt| match opt {
535+
parquet_options::CompressionOpt::Compression(compression) => {
536+
compression.clone()
537+
}
538+
}),
539+
dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| {
540+
match opt {
541+
parquet_options::DictionaryEnabledOpt::DictionaryEnabled(
542+
enabled,
543+
) => *enabled,
544+
}
545+
}),
546+
dictionary_page_size_limit: proto.dictionary_page_size_limit as usize,
547+
statistics_enabled: proto.statistics_enabled_opt.as_ref().map(
548+
|opt| match opt {
549+
parquet_options::StatisticsEnabledOpt::StatisticsEnabled(
550+
statistics,
551+
) => statistics.clone(),
552+
},
553+
),
554+
max_row_group_size: proto.max_row_group_size as usize,
555+
created_by: proto.created_by.clone(),
556+
column_index_truncate_length: proto
557+
.column_index_truncate_length_opt
558+
.as_ref()
559+
.map(|opt| match opt {
560+
parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
561+
}),
562+
statistics_truncate_length: proto
563+
.statistics_truncate_length_opt
564+
.as_ref()
565+
.map(|opt| match opt {
566+
parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
567+
}),
568+
data_page_row_count_limit: proto.data_page_row_count_limit as usize,
569+
encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
570+
parquet_options::EncodingOpt::Encoding(encoding) => {
571+
encoding.clone()
572+
}
573+
}),
574+
bloom_filter_on_read: proto.bloom_filter_on_read,
575+
bloom_filter_on_write: proto.bloom_filter_on_write,
576+
bloom_filter_fpp: proto
577+
.bloom_filter_fpp_opt
578+
.as_ref()
579+
.map(|opt| match opt {
580+
parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp,
581+
}),
582+
bloom_filter_ndv: proto
583+
.bloom_filter_ndv_opt
584+
.as_ref()
585+
.map(|opt| match opt {
586+
parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv,
587+
}),
588+
allow_single_file_parallelism: proto.allow_single_file_parallelism,
589+
maximum_parallel_row_group_writers: proto
590+
.maximum_parallel_row_group_writers
591+
as usize,
592+
maximum_buffered_record_batches_per_stream: proto
593+
.maximum_buffered_record_batches_per_stream
594+
as usize,
595+
schema_force_view_types: proto.schema_force_view_types,
596+
binary_as_string: proto.binary_as_string,
597+
skip_arrow_metadata: proto.skip_arrow_metadata,
598+
coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt {
599+
parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => {
600+
coerce_int96.clone()
601+
}
602+
}),
603+
coerce_int96_tz: proto
604+
.coerce_int96_tz_opt
605+
.as_ref()
606+
.map(|opt| match opt {
607+
parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(tz) => {
608+
tz.clone()
609+
}
610+
}),
611+
max_predicate_cache_size: proto
612+
.max_predicate_cache_size_opt
613+
.as_ref()
614+
.map(|opt| match opt {
615+
parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(
616+
size,
617+
) => *size as usize,
618+
}),
619+
use_content_defined_chunking: proto.content_defined_chunking.map(
620+
|cdc| {
621+
let defaults = CdcOptions::default();
622+
CdcOptions {
623+
// proto3 uses 0 as the wire default for uint64; a zero chunk size is
624+
// invalid, so treat it as "field not set" and fall back to the default.
625+
min_chunk_size: if cdc.min_chunk_size != 0 {
626+
cdc.min_chunk_size as usize
627+
} else {
628+
defaults.min_chunk_size
629+
},
630+
max_chunk_size: if cdc.max_chunk_size != 0 {
631+
cdc.max_chunk_size as usize
632+
} else {
633+
defaults.max_chunk_size
634+
},
635+
// norm_level = 0 is a valid value (and the default), so pass it through directly.
636+
norm_level: cdc.norm_level,
637+
}
638+
},
639+
),
640+
})
578641
}
579642
}
580643

@@ -606,13 +669,18 @@ mod parquet {
606669
}
607670
}
608671

609-
impl FromProto<&TableParquetOptionsProto> for TableParquetOptions {
610-
fn from_proto(proto: &TableParquetOptionsProto) -> Self {
611-
TableParquetOptions {
672+
impl TryFromProto<&TableParquetOptionsProto> for TableParquetOptions {
673+
type Error = datafusion_common::DataFusionError;
674+
675+
fn try_from_proto(
676+
proto: &TableParquetOptionsProto,
677+
) -> datafusion_common::Result<Self, Self::Error> {
678+
Ok(TableParquetOptions {
612679
global: proto
613680
.global
614681
.as_ref()
615-
.map(ParquetOptions::from_proto)
682+
.map(ParquetOptions::try_from_proto)
683+
.transpose()?
616684
.unwrap_or_default(),
617685
column_specific_options: proto
618686
.column_specific_options
@@ -635,7 +703,7 @@ mod parquet {
635703
.map(|(k, v)| (k.clone(), Some(v.clone())))
636704
.collect(),
637705
..Default::default()
638-
}
706+
})
639707
}
640708
}
641709

@@ -689,7 +757,7 @@ mod parquet {
689757
let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
690758
exec_datafusion_err!("Failed to decode TableParquetOptionsProto: {e:?}")
691759
})?;
692-
let options = TableParquetOptions::from_proto(&proto);
760+
let options = TableParquetOptions::try_from_proto(&proto)?;
693761
Ok(Arc::new(
694762
datafusion_datasource_parquet::file_format::ParquetFormatFactory {
695763
options: Some(options),
@@ -723,6 +791,64 @@ mod parquet {
723791
Ok(())
724792
}
725793
}
794+
795+
#[cfg(test)]
796+
mod tests {
797+
use super::*;
798+
799+
fn encode_table_options(proto: TableParquetOptionsProto) -> Vec<u8> {
800+
let mut buf = Vec::new();
801+
proto.encode(&mut buf).expect("encode parquet options");
802+
buf
803+
}
804+
805+
#[test]
806+
fn try_decode_file_format_errors_on_invalid_writer_version() {
807+
let proto = TableParquetOptionsProto {
808+
global: Some(ParquetOptionsProto {
809+
writer_version: "3.0".to_string(),
810+
..Default::default()
811+
}),
812+
..Default::default()
813+
};
814+
815+
let result = ParquetLogicalExtensionCodec.try_decode_file_format(
816+
&encode_table_options(proto),
817+
&TaskContext::default(),
818+
);
819+
820+
let err = result.err().expect("invalid writer version should error");
821+
assert!(
822+
err.to_string()
823+
.contains("Invalid parquet writer version: 3.0"),
824+
"{err}"
825+
);
826+
}
827+
828+
#[test]
829+
fn try_decode_file_format_defaults_empty_writer_version() {
830+
let proto = TableParquetOptionsProto {
831+
global: Some(ParquetOptionsProto::default()),
832+
..Default::default()
833+
};
834+
835+
let factory = ParquetLogicalExtensionCodec
836+
.try_decode_file_format(
837+
&encode_table_options(proto),
838+
&TaskContext::default(),
839+
)
840+
.expect("decode parquet options");
841+
let parquet_factory = factory
842+
.downcast_ref::<ParquetFormatFactory>()
843+
.expect("parquet format factory");
844+
let options = parquet_factory.options.as_ref().expect("parquet options");
845+
846+
assert_eq!(
847+
options.global.writer_version,
848+
ParquetOptions::default().writer_version
849+
);
850+
}
851+
}
726852
}
727853
#[cfg(feature = "parquet")]
728854
pub use parquet::ParquetLogicalExtensionCodec;

0 commit comments

Comments
 (0)