Skip to content

Commit a849feb

Browse files
committed
Refactor: Enhance ParquetSource initialization with pushdown filters
1 parent 1a255ac commit a849feb

1 file changed

Lines changed: 16 additions & 14 deletions

File tree

datafusion_iceberg/src/table.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -478,13 +478,15 @@ async fn table_scan(
478478
.collect()
479479
});
480480

481-
let file_source = if let Some(physical_predicate) = physical_predicate.clone() {
482-
Arc::new(
483-
ParquetSource::default().with_predicate(Arc::clone(&file_schema), physical_predicate),
484-
)
485-
} else {
486-
Arc::new(ParquetSource::default())
487-
};
481+
let file_source = Arc::new(
482+
if let Some(physical_predicate) = physical_predicate.clone() {
483+
ParquetSource::default()
484+
.with_predicate(Arc::clone(&file_schema), physical_predicate)
485+
.with_pushdown_filters(true)
486+
} else {
487+
ParquetSource::default()
488+
}
489+
);
488490

489491
// Create plan for every partition with delete files
490492
let mut plans = stream::iter(equality_delete_file_groups.into_iter())
@@ -572,15 +574,15 @@ async fn table_scan(
572574
last_updated_ms,
573575
)?;
574576

575-
let delete_file_source =
577+
let delete_file_source = Arc::new(
576578
if let Some(physical_predicate) = physical_predicate.clone() {
577-
Arc::new(ParquetSource::default().with_predicate(
578-
Arc::clone(&delete_file_schema),
579-
physical_predicate,
580-
))
579+
ParquetSource::default()
580+
.with_predicate(Arc::clone(&delete_file_schema), physical_predicate)
581+
.with_pushdown_filters(true)
581582
} else {
582-
Arc::new(ParquetSource::default())
583-
};
583+
ParquetSource::default()
584+
}
585+
);
584586

585587
let delete_file_scan_config = FileScanConfig::new(
586588
object_store_url.clone(),

0 commit comments

Comments
 (0)