Skip to content

Commit bc7d0e0

Browse files
committed
metadata_size_hint in incremental scan
1 parent a294d1a commit bc7d0e0

2 files changed

Lines changed: 11 additions & 5 deletions

File tree

crates/iceberg/src/arrow/incremental.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ async fn process_incremental_append_task(
6262
task: AppendedFileScanTask,
6363
batch_size: Option<usize>,
6464
file_io: FileIO,
65+
metadata_size_hint: Option<usize>,
6566
) -> Result<ArrowRecordBatchStream> {
6667
let mut virtual_columns = Vec::new();
6768

@@ -83,7 +84,7 @@ async fn process_incremental_append_task(
8384
file_io,
8485
true,
8586
arrow_reader_options,
86-
None,
87+
metadata_size_hint,
8788
task.base.file_size_in_bytes,
8889
)
8990
.await?;
@@ -246,6 +247,7 @@ impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
246247
channel::<Result<RecordBatch>>(reader.concurrency_limit_data_files);
247248

248249
let batch_size = reader.batch_size;
250+
let metadata_size_hint = reader.metadata_size_hint;
249251

250252
let (append_stream, delete_stream) = self;
251253

@@ -258,9 +260,13 @@ impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
258260
let appends_tx = appends_tx.clone();
259261
async move {
260262
spawn(async move {
261-
let record_batch_stream =
262-
process_incremental_append_task(append_task, batch_size, file_io)
263-
.await;
263+
let record_batch_stream = process_incremental_append_task(
264+
append_task,
265+
batch_size,
266+
file_io,
267+
metadata_size_hint,
268+
)
269+
.await;
264270

265271
process_record_batch_stream(
266272
record_batch_stream,

crates/iceberg/src/arrow/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use bytes::Bytes;
3434
use fnv::FnvHashSet;
3535
use futures::channel::mpsc::channel;
3636
use futures::future::BoxFuture;
37-
use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, TryStreamExt, try_join};
37+
use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
3838
use parquet::arrow::arrow_reader::{
3939
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
4040
};

0 commit comments

Comments
 (0)