Skip to content

Commit 709a244

Browse files
authored
perf(reader): Add Parquet metadata size hint option to ArrowReaderBuilder (#2173)
## Which issue does this PR close? - Partially address #2172. ## What changes are included in this PR? Add `with_metadata_size_hint` to `ArrowReaderBuilder`, allowing callers to configure the number of bytes to prefetch when reading Parquet footer metadata. No default is set—callers opt in via the builder. When unset, behavior is unchanged. ## Are these changes tested? Existing tests.
1 parent 118f5a5 commit 709a244

2 files changed

Lines changed: 26 additions & 1 deletion

File tree

crates/iceberg/src/arrow/delete_file_loader.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ impl BasicDeleteFileLoader {
6464
self.file_io.clone(),
6565
false,
6666
None,
67+
None,
6768
)
6869
.await?
6970
.build()?

crates/iceberg/src/arrow/reader.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ pub struct ArrowReaderBuilder {
6767
concurrency_limit_data_files: usize,
6868
row_group_filtering_enabled: bool,
6969
row_selection_enabled: bool,
70+
metadata_size_hint: Option<usize>,
7071
}
7172

7273
impl ArrowReaderBuilder {
@@ -80,6 +81,7 @@ impl ArrowReaderBuilder {
8081
concurrency_limit_data_files: num_cpus,
8182
row_group_filtering_enabled: true,
8283
row_selection_enabled: false,
84+
metadata_size_hint: None,
8385
}
8486
}
8587

@@ -108,6 +110,15 @@ impl ArrowReaderBuilder {
108110
self
109111
}
110112

113+
/// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata
114+
///
115+
/// This hint can help reduce the number of fetch requests. For more details see the
116+
/// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
117+
pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
118+
self.metadata_size_hint = Some(metadata_size_hint);
119+
self
120+
}
121+
111122
/// Build the ArrowReader.
112123
pub fn build(self) -> ArrowReader {
113124
ArrowReader {
@@ -120,6 +131,7 @@ impl ArrowReaderBuilder {
120131
concurrency_limit_data_files: self.concurrency_limit_data_files,
121132
row_group_filtering_enabled: self.row_group_filtering_enabled,
122133
row_selection_enabled: self.row_selection_enabled,
134+
metadata_size_hint: self.metadata_size_hint,
123135
}
124136
}
125137
}
@@ -136,6 +148,7 @@ pub struct ArrowReader {
136148

137149
row_group_filtering_enabled: bool,
138150
row_selection_enabled: bool,
151+
metadata_size_hint: Option<usize>,
139152
}
140153

141154
impl ArrowReader {
@@ -147,6 +160,7 @@ impl ArrowReader {
147160
let concurrency_limit_data_files = self.concurrency_limit_data_files;
148161
let row_group_filtering_enabled = self.row_group_filtering_enabled;
149162
let row_selection_enabled = self.row_selection_enabled;
163+
let metadata_size_hint = self.metadata_size_hint;
150164

151165
// Fast-path for single concurrency to avoid overhead of try_flatten_unordered
152166
let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
@@ -162,6 +176,7 @@ impl ArrowReader {
162176
self.delete_file_loader.clone(),
163177
row_group_filtering_enabled,
164178
row_selection_enabled,
179+
metadata_size_hint,
165180
)
166181
})
167182
.map_err(|err| {
@@ -183,6 +198,7 @@ impl ArrowReader {
183198
self.delete_file_loader.clone(),
184199
row_group_filtering_enabled,
185200
row_selection_enabled,
201+
metadata_size_hint,
186202
)
187203
})
188204
.map_err(|err| {
@@ -205,6 +221,7 @@ impl ArrowReader {
205221
delete_file_loader: CachingDeleteFileLoader,
206222
row_group_filtering_enabled: bool,
207223
row_selection_enabled: bool,
224+
metadata_size_hint: Option<usize>,
208225
) -> Result<ArrowRecordBatchStream> {
209226
let should_load_page_index =
210227
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
@@ -219,6 +236,7 @@ impl ArrowReader {
219236
file_io.clone(),
220237
should_load_page_index,
221238
None,
239+
metadata_size_hint,
222240
)
223241
.await?;
224242

@@ -271,6 +289,7 @@ impl ArrowReader {
271289
file_io.clone(),
272290
should_load_page_index,
273291
Some(options),
292+
metadata_size_hint,
274293
)
275294
.await?
276295
} else {
@@ -474,17 +493,22 @@ impl ArrowReader {
474493
file_io: FileIO,
475494
should_load_page_index: bool,
476495
arrow_reader_options: Option<ArrowReaderOptions>,
496+
metadata_size_hint: Option<usize>,
477497
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
478498
// Get the metadata for the Parquet file we need to read and build
479499
// a reader for the data within
480500
let parquet_file = file_io.new_input(data_file_path)?;
481501
let (parquet_metadata, parquet_reader) =
482502
try_join!(parquet_file.metadata(), parquet_file.reader())?;
483-
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
503+
let mut parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
484504
.with_preload_column_index(true)
485505
.with_preload_offset_index(true)
486506
.with_preload_page_index(should_load_page_index);
487507

508+
if let Some(hint) = metadata_size_hint {
509+
parquet_file_reader = parquet_file_reader.with_metadata_size_hint(hint);
510+
}
511+
488512
// Create the record batch stream builder, which wraps the parquet file reader
489513
let options = arrow_reader_options.unwrap_or_default();
490514
let record_batch_stream_builder =

0 commit comments

Comments
 (0)