Skip to content

Commit f707ba8

Browse files
committed
Refactor open logic as well.
1 parent f7a0507 commit f707ba8

3 files changed

Lines changed: 42 additions & 24 deletions

File tree

crates/iceberg/src/arrow/delete_file_loader.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ impl BasicDeleteFileLoader {
6969
&self.file_io,
7070
file_size_in_bytes,
7171
parquet_read_options,
72-
None,
7372
)
7473
.await?;
7574

crates/iceberg/src/arrow/reader.rs

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ struct FileScanTaskReader {
254254
row_group_filtering_enabled: bool,
255255
row_selection_enabled: bool,
256256
parquet_read_options: ParquetReadOptions,
257-
bytes_read: Arc<AtomicU64>,
257+
scan_metrics: ScanMetrics,
258258
}
259259

260260
impl ArrowReader {
@@ -272,7 +272,7 @@ impl ArrowReader {
272272
tasks: FileScanTaskStream,
273273
) -> Result<(ArrowRecordBatchStream, ScanMetrics)> {
274274
let concurrency_limit_data_files = self.concurrency_limit_data_files;
275-
let (scan_metrics, bytes_read) = ScanMetrics::new();
275+
let scan_metrics = ScanMetrics::new();
276276

277277
let task_reader = FileScanTaskReader {
278278
batch_size: self.batch_size,
@@ -281,7 +281,7 @@ impl ArrowReader {
281281
row_group_filtering_enabled: self.row_group_filtering_enabled,
282282
row_selection_enabled: self.row_selection_enabled,
283283
parquet_read_options: self.parquet_read_options,
284-
bytes_read,
284+
scan_metrics: scan_metrics.clone(),
285285
};
286286

287287
// Fast-path for single concurrency to avoid overhead of try_flatten_unordered
@@ -323,13 +323,14 @@ impl FileScanTaskReader {
323323
.delete_file_loader
324324
.load_deletes(&task.deletes, Arc::clone(&task.schema));
325325

326-
// Open the Parquet file once, loading its metadata
327-
let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
326+
// Open the Parquet file once, loading its metadata.
327+
// Uses the counting variant so metadata + data page I/O is tracked.
328+
let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file_counted(
328329
&task.data_file_path,
329330
&self.file_io,
330331
task.file_size_in_bytes,
331332
parquet_read_options,
332-
Some(&self.bytes_read),
333+
self.scan_metrics.bytes_read_counter(),
333334
)
334335
.await?;
335336

@@ -608,24 +609,40 @@ impl FileScanTaskReader {
608609
}
609610

610611
impl ArrowReader {
611-
/// Opens a Parquet file and loads its metadata, returning both the reader and metadata.
612-
/// The reader can be reused to build a `ParquetRecordBatchStreamBuilder` without
613-
/// reopening the file.
612+
/// Opens a Parquet file and loads its metadata.
614613
pub(crate) async fn open_parquet_file(
615614
data_file_path: &str,
616615
file_io: &FileIO,
617616
file_size_in_bytes: u64,
618617
parquet_read_options: ParquetReadOptions,
619-
bytes_read: Option<&Arc<AtomicU64>>,
620618
) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
621619
let parquet_file = file_io.new_input(data_file_path)?;
622-
let parquet_reader: Box<dyn FileRead> = match bytes_read {
623-
Some(counter) => Box::new(CountingFileRead::new(
624-
parquet_file.reader().await?,
625-
Arc::clone(counter),
626-
)),
627-
None => parquet_file.reader().await?,
628-
};
620+
let parquet_reader = parquet_file.reader().await?;
621+
Self::build_parquet_reader(parquet_reader, file_size_in_bytes, parquet_read_options).await
622+
}
623+
624+
/// Opens a Parquet file wrapped with [`CountingFileRead`] so that all I/O
625+
/// (metadata + data pages) is accumulated into `bytes_read`.
626+
async fn open_parquet_file_counted(
627+
data_file_path: &str,
628+
file_io: &FileIO,
629+
file_size_in_bytes: u64,
630+
parquet_read_options: ParquetReadOptions,
631+
bytes_read: &Arc<AtomicU64>,
632+
) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
633+
let parquet_file = file_io.new_input(data_file_path)?;
634+
let parquet_reader: Box<dyn FileRead> = Box::new(CountingFileRead::new(
635+
parquet_file.reader().await?,
636+
Arc::clone(bytes_read),
637+
));
638+
Self::build_parquet_reader(parquet_reader, file_size_in_bytes, parquet_read_options).await
639+
}
640+
641+
async fn build_parquet_reader(
642+
parquet_reader: Box<dyn FileRead>,
643+
file_size_in_bytes: u64,
644+
parquet_read_options: ParquetReadOptions,
645+
) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
629646
let mut reader = ArrowFileReader::new(
630647
FileMetadata {
631648
size: file_size_in_bytes,

crates/iceberg/src/arrow/scan_metrics.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,14 @@ pub struct ScanMetrics {
5858
}
5959

6060
impl ScanMetrics {
61-
pub(crate) fn new() -> (Self, Arc<AtomicU64>) {
62-
let bytes_read = Arc::new(AtomicU64::new(0));
63-
let metrics = Self {
64-
bytes_read: Arc::clone(&bytes_read),
65-
};
66-
(metrics, bytes_read)
61+
pub(crate) fn new() -> Self {
62+
Self {
63+
bytes_read: Arc::new(AtomicU64::new(0)),
64+
}
65+
}
66+
67+
pub(crate) fn bytes_read_counter(&self) -> &Arc<AtomicU64> {
68+
&self.bytes_read
6769
}
6870

6971
/// Total bytes read from storage for data files during this scan.

0 commit comments

Comments
 (0)