Skip to content

Commit c025c48

Browse files
authored
[Parquet]: GH-563: Make path_in_schema optional (#9678)
# Which issue does this PR close? none # Rationale for this change This is a proof of concept implementation for apache/parquet-format#563 # What changes are included in this PR? Since version 57.0.0, this crate has been tolerant of a missing `path_in_schema`. This PR adds options to cease writing the field as well. The option defaults to continuing to write the field. See related discussion on parquet mailing list: https://lists.apache.org/thread/czm2bk45wwtkhhpqxqvmx9dk5wkwk1kt # Are these changes tested? Yes # Are there any user-facing changes? No, this only adds an optional behavior change that defaults to no change # Related PRs - apache/parquet-format#563 - apache/parquet-format#564 - apache/parquet-java#3470
1 parent 13f5f94 commit c025c48

8 files changed

Lines changed: 166 additions & 10 deletions

File tree

parquet/benches/metadata.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use parquet::file::serialized_reader::ReadOptionsBuilder;
4141
const NUM_COLUMNS: usize = 10_000;
4242
const NUM_ROW_GROUPS: usize = 10;
4343

44-
fn encoded_meta(is_nullable: bool, has_lists: bool) -> Vec<u8> {
44+
fn encoded_meta(is_nullable: bool, has_lists: bool, write_path_in_schema: bool) -> Vec<u8> {
4545
let mut rng = seedable_rng();
4646

4747
let mut column_desc_ptrs: Vec<ColumnDescPtr> = Vec::with_capacity(NUM_COLUMNS);
@@ -143,7 +143,11 @@ fn encoded_meta(is_nullable: bool, has_lists: bool) -> Vec<u8> {
143143
let mut buffer = Vec::with_capacity(1024);
144144
{
145145
let buf = TrackedWrite::new(&mut buffer);
146-
let writer = ParquetMetaDataWriter::new_with_tracked(buf, &metadata);
146+
let mut writer = ParquetMetaDataWriter::new_with_tracked(buf, &metadata);
147+
// use defaults unless `write_path_in_schema` is false
148+
if !write_path_in_schema {
149+
writer = writer.with_write_path_in_schema(write_path_in_schema);
150+
}
147151
writer.finish().unwrap();
148152
}
149153

@@ -233,7 +237,7 @@ fn criterion_benchmark(c: &mut Criterion) {
233237
})
234238
});
235239

236-
let buf: Bytes = black_box(encoded_meta(false, false)).into();
240+
let buf: Bytes = black_box(encoded_meta(false, false, true)).into();
237241
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
238242
c.bench_function("decode parquet metadata (wide)", |b| {
239243
b.iter(|| {
@@ -275,7 +279,15 @@ fn criterion_benchmark(c: &mut Criterion) {
275279
})
276280
});
277281

278-
let buf: Bytes = black_box(encoded_meta(true, true)).into();
282+
let buf: Bytes = black_box(encoded_meta(false, false, false)).into();
283+
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
284+
c.bench_function("decode parquet metadata no path_in_schema (wide)", |b| {
285+
b.iter(|| {
286+
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
287+
})
288+
});
289+
290+
let buf: Bytes = black_box(encoded_meta(true, true, true)).into();
279291
c.bench_function("decode parquet metadata w/ size stats (wide)", |b| {
280292
b.iter(|| {
281293
ParquetMetaDataReader::decode_metadata(&buf).unwrap();

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4596,6 +4596,44 @@ mod tests {
45964596
}
45974597
}
45984598

4599+
#[test]
4600+
fn test_arrow_writer_skip_path_in_schema() {
4601+
let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4602+
let file_schema = Arc::new(batch_schema.clone());
4603+
4604+
let batch = RecordBatch::try_new(
4605+
Arc::new(batch_schema),
4606+
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4607+
)
4608+
.unwrap();
4609+
4610+
// default options should still write path_in_schema
4611+
let skip_options = ArrowWriterOptions::new();
4612+
4613+
let mut buf = Vec::with_capacity(1024);
4614+
let mut writer =
4615+
ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4616+
writer.write(&batch).unwrap();
4617+
writer.close().unwrap();
4618+
4619+
// override to not write path_in_schema
4620+
let skip_options = ArrowWriterOptions::new().with_properties(
4621+
WriterProperties::builder()
4622+
.set_write_path_in_schema(false)
4623+
.build(),
4624+
);
4625+
4626+
let mut buf2 = Vec::with_capacity(1024);
4627+
let mut writer =
4628+
ArrowWriter::try_new_with_options(&mut buf2, file_schema.clone(), skip_options)
4629+
.unwrap();
4630+
writer.write(&batch).unwrap();
4631+
writer.close().unwrap();
4632+
4633+
// buf2 should be a bit smaller due to lack of path_in_schema
4634+
assert!(buf.len() > buf2.len());
4635+
}
4636+
45994637
#[test]
46004638
fn mismatched_schemas() {
46014639
let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);

parquet/src/bin/parquet-rewrite.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,10 @@ struct Args {
279279
#[clap(long)]
280280
write_page_header_statistics: Option<bool>,
281281

282+
/// Write path_in_schema to the column metadata.
283+
#[clap(long)]
284+
write_path_in_schema: Option<bool>,
285+
282286
/// Sets whether bloom filter is enabled for all columns.
283287
#[clap(long)]
284288
bloom_filter_enabled: Option<bool>,
@@ -406,6 +410,9 @@ fn main() {
406410
if let Some(value) = args.coerce_types {
407411
writer_properties_builder = writer_properties_builder.set_coerce_types(value);
408412
}
413+
if let Some(value) = args.write_path_in_schema {
414+
writer_properties_builder = writer_properties_builder.set_write_path_in_schema(value);
415+
}
409416
if let Some(value) = args.write_batch_size {
410417
writer_properties_builder = writer_properties_builder.set_write_batch_size(value);
411418
}

parquet/src/file/metadata/thrift/mod.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,10 +1333,15 @@ pub(super) fn serialize_column_meta_data<W: Write>(
13331333
.encodings()
13341334
.collect::<Vec<_>>()
13351335
.write_thrift_field(w, 2, 1)?;
1336-
let path = column_chunk.column_descr.path().parts();
1337-
let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1338-
path.write_thrift_field(w, 3, 2)?;
1339-
column_chunk.compression.write_thrift_field(w, 4, 3)?;
1336+
if w.write_path_in_schema() {
1337+
let path = column_chunk.column_descr.path().parts();
1338+
let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1339+
path.write_thrift_field(w, 3, 2)?;
1340+
column_chunk.compression.write_thrift_field(w, 4, 3)?;
1341+
} else {
1342+
column_chunk.compression.write_thrift_field(w, 4, 2)?;
1343+
}
1344+
13401345
column_chunk.num_values.write_thrift_field(w, 5, 4)?;
13411346
column_chunk
13421347
.total_uncompressed_size
@@ -1406,6 +1411,8 @@ pub(super) fn serialize_column_meta_data<W: Write>(
14061411
pub(super) struct FileMeta<'a> {
14071412
pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
14081413
pub(super) row_groups: &'a Vec<RowGroupMetaData>,
1414+
// If true, then write the `path_in_schema` field in the ColumnMetaData struct.
1415+
pub(super) write_path_in_schema: bool,
14091416
}
14101417

14111418
// struct FileMetaData {
@@ -1425,6 +1432,8 @@ impl<'a> WriteThrift for FileMeta<'a> {
14251432
// needed for last_field_id w/o encryption
14261433
#[allow(unused_assignments)]
14271434
fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1435+
writer.set_write_path_in_schema(self.write_path_in_schema);
1436+
14281437
self.file_metadata
14291438
.version
14301439
.write_thrift_field(writer, 1, 0)?;

parquet/src/file/metadata/writer.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
6262
created_by: Option<String>,
6363
object_writer: MetadataObjectWriter,
6464
writer_version: i32,
65+
write_path_in_schema: bool,
6566
}
6667

6768
impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
@@ -259,6 +260,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
259260
let file_meta = FileMeta {
260261
file_metadata: &file_metadata,
261262
row_groups: &row_groups,
263+
write_path_in_schema: self.write_path_in_schema,
262264
};
263265

264266
// Write file metadata
@@ -293,6 +295,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
293295
row_groups: Vec<RowGroupMetaData>,
294296
created_by: Option<String>,
295297
writer_version: i32,
298+
write_path_in_schema: bool,
296299
) -> Self {
297300
Self {
298301
buf,
@@ -304,6 +307,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
304307
created_by,
305308
object_writer: Default::default(),
306309
writer_version,
310+
write_path_in_schema,
307311
}
308312
}
309313

@@ -415,6 +419,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
415419
pub struct ParquetMetaDataWriter<'a, W: Write> {
416420
buf: TrackedWrite<W>,
417421
metadata: &'a ParquetMetaData,
422+
write_path_in_schema: bool,
418423
}
419424

420425
impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
@@ -436,7 +441,20 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
436441
///
437442
/// See example on the struct level documentation
438443
pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
439-
Self { buf, metadata }
444+
Self {
445+
buf,
446+
metadata,
447+
write_path_in_schema: true,
448+
}
449+
}
450+
451+
/// Set whether or not to write the `path_in_schema` field in the Thrift `ColumnMetaData`
452+
/// struct.
453+
pub fn with_write_path_in_schema(self, val: bool) -> Self {
454+
Self {
455+
write_path_in_schema: val,
456+
..self
457+
}
440458
}
441459

442460
/// Write the metadata to the buffer
@@ -460,6 +478,7 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
460478
row_groups,
461479
created_by,
462480
file_metadata.version(),
481+
self.write_path_in_schema,
463482
);
464483

465484
if let Some(column_indexes) = column_indexes {

parquet/src/file/properties.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ pub const DEFAULT_OFFSET_INDEX_DISABLED: bool = false;
6969
pub const DEFAULT_COERCE_TYPES: bool = false;
7070
/// Default value for [`WriterProperties::data_page_v2_compression_ratio_threshold`]
7171
pub const DEFAULT_DATA_PAGE_V2_COMPRESSION_RATIO_THRESHOLD: f64 = 1.0;
72+
/// Default value for [`WriterProperties::write_path_in_schema`]
73+
pub const DEFAULT_WRITE_PATH_IN_SCHEMA: bool = true;
7274
/// Default minimum chunk size for content-defined chunking: 256 KiB.
7375
pub const DEFAULT_CDC_MIN_CHUNK_SIZE: usize = 256 * 1024;
7476
/// Default maximum chunk size for content-defined chunking: 1024 KiB.
@@ -252,6 +254,7 @@ pub struct WriterProperties {
252254
statistics_truncate_length: Option<usize>,
253255
coerce_types: bool,
254256
content_defined_chunking: Option<CdcOptions>,
257+
write_path_in_schema: bool,
255258
#[cfg(feature = "encryption")]
256259
pub(crate) file_encryption_properties: Option<Arc<FileEncryptionProperties>>,
257260
}
@@ -437,6 +440,14 @@ impl WriterProperties {
437440
self.coerce_types
438441
}
439442

443+
/// Returns `true` if the `path_in_schema` field of the `ColumnMetaData` Thrift struct
444+
/// should be written.
445+
///
446+
/// For more details see [`WriterPropertiesBuilder::set_write_path_in_schema`]
447+
pub fn write_path_in_schema(&self) -> bool {
448+
self.write_path_in_schema
449+
}
450+
440451
/// EXPERIMENTAL: Returns content-defined chunking options, or `None` if CDC is disabled.
441452
///
442453
/// For more details see [`WriterPropertiesBuilder::set_content_defined_chunking`]
@@ -592,6 +603,7 @@ pub struct WriterPropertiesBuilder {
592603
statistics_truncate_length: Option<usize>,
593604
coerce_types: bool,
594605
content_defined_chunking: Option<CdcOptions>,
606+
write_path_in_schema: bool,
595607
#[cfg(feature = "encryption")]
596608
file_encryption_properties: Option<Arc<FileEncryptionProperties>>,
597609
}
@@ -616,6 +628,7 @@ impl Default for WriterPropertiesBuilder {
616628
statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH,
617629
coerce_types: DEFAULT_COERCE_TYPES,
618630
content_defined_chunking: None,
631+
write_path_in_schema: DEFAULT_WRITE_PATH_IN_SCHEMA,
619632
#[cfg(feature = "encryption")]
620633
file_encryption_properties: None,
621634
}
@@ -670,6 +683,7 @@ impl WriterPropertiesBuilder {
670683
statistics_truncate_length: self.statistics_truncate_length,
671684
coerce_types: self.coerce_types,
672685
content_defined_chunking: self.content_defined_chunking,
686+
write_path_in_schema: self.write_path_in_schema,
673687
#[cfg(feature = "encryption")]
674688
file_encryption_properties: self.file_encryption_properties,
675689
}
@@ -885,6 +899,43 @@ impl WriterPropertiesBuilder {
885899
self
886900
}
887901

902+
/// EXPERIMENTAL: Should the writer emit the `path_in_schema` element of the
903+
/// `ColumnMetaData` Thrift struct. Defaults to `true` via [`DEFAULT_WRITE_PATH_IN_SCHEMA`].
904+
///
905+
/// Because `path_in_schema` is a field on the `ColumnMetaData`, it is repeated
906+
/// `num_columns * num_rowgroups` times. Compounding this is any level of nesting or
907+
/// repetition in the schema. For instance, a top-level list column named `foo` will have
908+
/// a `path_in_schema` of `["foo", "list", "element"]`. A list-of-struct is even worse,
909+
/// because the necessary list wrapping is repeated for each element of the struct. A
910+
/// file with a deeply nested schema and many row groups can have a large percentage of the
911+
/// footer taken up by this field. For example, a file of 38 row groups with a schema containing
912+
/// several lists of structs containing lists had 36% of the footer taken up by `path_in_schema`.
913+
/// Removing this redundant information can greatly speed up footer parsing, which is particularly
914+
/// important in scenarios where one does not wish to read the entire file (e.g. point
915+
/// lookups).
916+
///
917+
/// <div class="warning">
918+
///
919+
/// **WARNING:**
920+
/// Setting this to `false` will break compatibility with Parquet readers that
921+
/// still expect this field to be present. Virtually all Parquet readers (parquet-java,
922+
/// Spark, arrow-cpp, pyarrow, pandas to name a few), with the exception
923+
/// of the one in this crate, expect this field to be present, and will terminate execution
924+
/// if it is not. This will continue to be the case unless/until the Parquet format
925+
/// specification is explicitly changed to allow this field to be missing. As a consquence,
926+
/// users should only set this to `false` if they have verified that any reader(s) they plan
927+
/// to use can tolerate the absence of this field.
928+
///
929+
/// For more context, see [GH-563].
930+
///
931+
/// </div>
932+
///
933+
/// [GH-563]: https://github.com/apache/parquet-format/issues/563
934+
pub fn set_write_path_in_schema(mut self, write_path_in_schema: bool) -> Self {
935+
self.write_path_in_schema = write_path_in_schema;
936+
self
937+
}
938+
888939
/// EXPERIMENTAL: Sets content-defined chunking options, or disables CDC with `None`.
889940
///
890941
/// When enabled, data page boundaries are determined by a rolling hash of the
@@ -1253,6 +1304,7 @@ impl From<WriterProperties> for WriterPropertiesBuilder {
12531304
statistics_truncate_length: props.statistics_truncate_length,
12541305
coerce_types: props.coerce_types,
12551306
content_defined_chunking: props.content_defined_chunking,
1307+
write_path_in_schema: props.write_path_in_schema,
12561308
#[cfg(feature = "encryption")]
12571309
file_encryption_properties: props.file_encryption_properties,
12581310
}

parquet/src/file/writer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,12 +345,14 @@ impl<W: Write + Send> SerializedFileWriter<W> {
345345
let column_indexes = std::mem::take(&mut self.column_indexes);
346346
let offset_indexes = std::mem::take(&mut self.offset_indexes);
347347

348+
let write_path_in_schema = self.props.write_path_in_schema();
348349
let mut encoder = ThriftMetadataWriter::new(
349350
&mut self.buf,
350351
&self.descr,
351352
row_groups,
352353
Some(self.props.created_by().to_string()),
353354
self.props.writer_version().as_num(),
355+
write_path_in_schema,
354356
);
355357

356358
#[cfg(feature = "encryption")]

parquet/src/parquet_thrift.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -726,12 +726,29 @@ where
726726
/// [compact output]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
727727
pub(crate) struct ThriftCompactOutputProtocol<W: Write> {
728728
writer: W,
729+
write_path_in_schema: bool,
729730
}
730731

731732
impl<W: Write> ThriftCompactOutputProtocol<W> {
732733
/// Create a new `ThriftCompactOutputProtocol` wrapping the byte sink `writer`.
733734
pub(crate) fn new(writer: W) -> Self {
734-
Self { writer }
735+
Self {
736+
writer,
737+
write_path_in_schema: true,
738+
}
739+
}
740+
741+
// TODO(ets): at some point there should probably be a properties object
742+
// to control aspects of thrift output. But since this is the only option to date
743+
// I'm choosing a simpler API.
744+
/// Control the writing of the `path_in_schema` element of the `ColumnMetaData`
745+
pub(crate) fn set_write_path_in_schema(&mut self, val: bool) {
746+
self.write_path_in_schema = val;
747+
}
748+
749+
/// Indicate whether or not to emit `path_in_schema`.
750+
pub(crate) fn write_path_in_schema(&self) -> bool {
751+
self.write_path_in_schema
735752
}
736753

737754
/// Write a single byte to the output stream.

0 commit comments

Comments
 (0)