Skip to content

Commit 32996b6

Browse files
authored
perf(reader): Avoid second create_parquet_record_batch_stream_builder() call for migrated tables (#2176)
## Which issue does this PR close? - Partially addresses #2172. ## What changes are included in this PR? Introduces `open_parquet_file()`, which opens the file once and returns both the `ArrowFileReader` and `ArrowReaderMetadata`. The caller inspects the metadata in-memory for field IDs, optionally rebuilds `ArrowReaderMetadata` with a custom schema for migrated tables, then passes the original `ArrowFileReader` to `ParquetRecordBatchStreamBuilder::new_with_metadata()`. This eliminates the redundant file open that previously occurred for migrated tables. ## Are these changes tested? Existing tests. Also ran full Iceberg Java suite via Comet.
1 parent db3c966 commit 32996b6

2 files changed

Lines changed: 48 additions & 39 deletions

File tree

crates/iceberg/src/arrow/delete_file_loader.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use std::sync::Arc;
1919

2020
use futures::{StreamExt, TryStreamExt};
21+
use parquet::arrow::ParquetRecordBatchStreamBuilder;
2122

2223
use crate::arrow::ArrowReader;
2324
use crate::arrow::reader::ParquetReadOptions;
@@ -61,16 +62,20 @@ impl BasicDeleteFileLoader {
6162
Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly
6263
as that introduces a circular dependency.
6364
*/
64-
let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder(
65+
let parquet_read_options = ParquetReadOptions::builder().build();
66+
67+
let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
6568
data_file_path,
66-
self.file_io.clone(),
67-
None,
69+
&self.file_io,
6870
file_size_in_bytes,
69-
ParquetReadOptions::builder().build(),
71+
parquet_read_options,
7072
)
71-
.await?
72-
.build()?
73-
.map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")));
73+
.await?;
74+
75+
let record_batch_stream =
76+
ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata)
77+
.build()?
78+
.map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")));
7479

7580
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
7681
}

crates/iceberg/src/arrow/reader.rs

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use fnv::FnvHashSet;
3535
use futures::future::BoxFuture;
3636
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
3737
use parquet::arrow::arrow_reader::{
38-
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
38+
ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
3939
};
4040
use parquet::arrow::async_reader::AsyncFileReader;
4141
use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
@@ -320,12 +320,10 @@ impl ArrowReader {
320320
let delete_filter_rx =
321321
delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
322322

323-
// Migrated tables lack field IDs, requiring us to inspect the schema to choose
324-
// between field-ID-based or position-based projection
325-
let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
323+
// Open the Parquet file once, loading its metadata
324+
let (parquet_file_reader, arrow_metadata) = Self::open_parquet_file(
326325
&task.data_file_path,
327-
file_io.clone(),
328-
None,
326+
&file_io,
329327
task.file_size_in_bytes,
330328
parquet_read_options,
331329
)
@@ -334,7 +332,7 @@ impl ArrowReader {
334332
// Check if Parquet file has embedded field IDs
335333
// Corresponds to Java's ParquetSchemaUtil.hasIds()
336334
// Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
337-
let missing_field_ids = initial_stream_builder
335+
let missing_field_ids = arrow_metadata
338336
.schema()
339337
.fields()
340338
.iter()
@@ -356,38 +354,42 @@ impl ArrowReader {
356354
// - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
357355
// - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
358356
// - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
359-
let mut record_batch_stream_builder = if missing_field_ids {
357+
let arrow_metadata = if missing_field_ids {
360358
// Parquet file lacks field IDs - must assign them before reading
361359
let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
362360
// Branch 2: Apply name mapping to assign correct Iceberg field IDs
363361
// Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
364362
// to columns without field id"
365363
// Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
366364
apply_name_mapping_to_arrow_schema(
367-
Arc::clone(initial_stream_builder.schema()),
365+
Arc::clone(arrow_metadata.schema()),
368366
name_mapping,
369367
)?
370368
} else {
371369
// Branch 3: No name mapping - use position-based fallback IDs
372370
// Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
373-
add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
371+
add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
374372
};
375373

376374
let options = ArrowReaderOptions::new().with_schema(arrow_schema);
377-
378-
Self::create_parquet_record_batch_stream_builder(
379-
&task.data_file_path,
380-
file_io.clone(),
381-
Some(options),
382-
task.file_size_in_bytes,
383-
parquet_read_options,
384-
)
385-
.await?
375+
ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
376+
|e| {
377+
Error::new(
378+
ErrorKind::Unexpected,
379+
"Failed to create ArrowReaderMetadata with field ID schema",
380+
)
381+
.with_source(e)
382+
},
383+
)?
386384
} else {
387385
// Branch 1: File has embedded field IDs - trust them
388-
initial_stream_builder
386+
arrow_metadata
389387
};
390388

389+
// Build the stream reader, reusing the already-opened file reader
390+
let mut record_batch_stream_builder =
391+
ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata);
392+
391393
// Filter out metadata fields for Parquet projection (they don't exist in files)
392394
let project_field_ids_without_metadata: Vec<i32> = task
393395
.project_field_ids
@@ -579,30 +581,32 @@ impl ArrowReader {
579581
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
580582
}
581583

582-
pub(crate) async fn create_parquet_record_batch_stream_builder(
584+
/// Opens a Parquet file and loads its metadata, returning both the reader and metadata.
585+
/// The reader can be reused to build a `ParquetRecordBatchStreamBuilder` without
586+
/// reopening the file.
587+
pub(crate) async fn open_parquet_file(
583588
data_file_path: &str,
584-
file_io: FileIO,
585-
arrow_reader_options: Option<ArrowReaderOptions>,
589+
file_io: &FileIO,
586590
file_size_in_bytes: u64,
587591
parquet_read_options: ParquetReadOptions,
588-
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
589-
// Get the metadata for the Parquet file we need to read and build
590-
// a reader for the data within
592+
) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
591593
let parquet_file = file_io.new_input(data_file_path)?;
592594
let parquet_reader = parquet_file.reader().await?;
593-
let parquet_file_reader = ArrowFileReader::new(
595+
let mut reader = ArrowFileReader::new(
594596
FileMetadata {
595597
size: file_size_in_bytes,
596598
},
597599
parquet_reader,
598600
)
599601
.with_parquet_read_options(parquet_read_options);
600602

601-
// Create the record batch stream builder, which wraps the parquet file reader
602-
let options = arrow_reader_options.unwrap_or_default();
603-
let record_batch_stream_builder =
604-
ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
605-
Ok(record_batch_stream_builder)
603+
let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
604+
.await
605+
.map_err(|e| {
606+
Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
607+
})?;
608+
609+
Ok((reader, arrow_metadata))
606610
}
607611

608612
/// computes a `RowSelection` from positional delete indices.

0 commit comments

Comments
 (0)