Skip to content

Commit c8aefab

Browse files
authored
refactor: Split Parquet BloomFilter CPU and IO into separate states (#21285)
## Which issue does this PR close? - part of #20529 - Broken out of #20820 ## Rationale for this change We are try to be explicit about CPU and IO in the Parquet reader, however, the code for applying bloom filters still did both CPU and IO in the same future ## What changes are included in this PR? 1. Split the states for IO and CPU work when applying bloom filters ## Are these changes tested? Functionally by existing tests I also ran performance tests and they didn't show any substantial performance change ## Are there any user-facing changes? No, this is internal code reorganization
1 parent d972b78 commit c8aefab

File tree

2 files changed

+212
-80
lines changed

2 files changed

+212
-80
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 120 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
use crate::page_filter::PagePruningAccessPlanFilter;
2121
use crate::row_filter::build_projection_read_plan;
22-
use crate::row_group_filter::RowGroupAccessPlanFilter;
22+
use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter};
2323
use crate::{
2424
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
2525
apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
@@ -70,7 +70,10 @@ use parquet::arrow::arrow_reader::{
7070
ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
7171
};
7272
use parquet::arrow::async_reader::AsyncFileReader;
73+
use parquet::arrow::parquet_column;
7374
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
75+
use parquet::basic::Type;
76+
use parquet::bloom_filter::Sbbf;
7477
use parquet::errors::ParquetError;
7578
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
7679

@@ -163,6 +166,9 @@ pub(super) struct ParquetOpener {
163166
/// PruneWithStatistics
164167
/// |
165168
/// v
169+
/// LoadBloomFilters
170+
/// |
171+
/// v
166172
/// PruneWithBloomFilters
167173
/// |
168174
/// v
@@ -196,10 +202,10 @@ enum ParquetOpenState {
196202
LoadPageIndex(BoxFuture<'static, Result<FiltersPreparedParquetOpen>>),
197203
/// Pruning Row Groups
198204
PruneWithStatistics(Box<FiltersPreparedParquetOpen>),
199-
/// Pruning with Bloom Filters
200-
///
201-
/// TODO: split state as this currently does both I/O and CPU work
202-
PruneWithBloomFilters(BoxFuture<'static, Result<RowGroupsPrunedParquetOpen>>),
205+
/// Loading bloom filters required for row-group pruning
206+
LoadBloomFilters(BoxFuture<'static, Result<BloomFiltersLoadedParquetOpen>>),
207+
/// Pruning with preloaded Bloom Filters
208+
PruneWithBloomFilters(Box<BloomFiltersLoadedParquetOpen>),
203209
/// Builds the final reader stream
204210
///
205211
/// TODO: split state as this currently does both I/O and CPU work.
@@ -273,6 +279,17 @@ struct RowGroupsPrunedParquetOpen {
273279
row_groups: RowGroupAccessPlanFilter,
274280
}
275281

282+
/// State of [`ParquetOpenState`]
283+
///
284+
/// Result of loading bloom filters needed for row-group pruning.
285+
struct BloomFiltersLoadedParquetOpen {
286+
prepared: RowGroupsPrunedParquetOpen,
287+
/// Bloom filters loaded for each row group that remains under consideration.
288+
///
289+
/// indexed by parquet row-group index
290+
row_group_bloom_filters: Vec<BloomFilterStatistics>,
291+
}
292+
276293
/// Implements state machine described in [`ParquetOpenState`]
277294
struct ParquetOpenFuture {
278295
state: ParquetOpenState,
@@ -354,13 +371,16 @@ impl ParquetOpenState {
354371
}
355372
ParquetOpenState::PruneWithStatistics(prepared) => {
356373
let prepared_row_groups = prepared.prune_row_groups()?;
357-
Ok(ParquetOpenState::PruneWithBloomFilters(
358-
prepared_row_groups.prune_bloom_filters().boxed(),
374+
Ok(ParquetOpenState::LoadBloomFilters(
375+
prepared_row_groups.load_bloom_filters().boxed(),
359376
))
360377
}
361-
ParquetOpenState::PruneWithBloomFilters(future) => {
362-
Ok(ParquetOpenState::PruneWithBloomFilters(future))
378+
ParquetOpenState::LoadBloomFilters(future) => {
379+
Ok(ParquetOpenState::LoadBloomFilters(future))
363380
}
381+
ParquetOpenState::PruneWithBloomFilters(loaded) => Ok(
382+
ParquetOpenState::BuildStream(Box::new(loaded.prune_bloom_filters())),
383+
),
364384
ParquetOpenState::BuildStream(prepared) => {
365385
Ok(ParquetOpenState::Ready(prepared.build_stream()?))
366386
}
@@ -413,13 +433,13 @@ impl Future for ParquetOpenFuture {
413433
}
414434
};
415435
}
416-
ParquetOpenState::PruneWithBloomFilters(mut future) => {
436+
ParquetOpenState::LoadBloomFilters(mut future) => {
417437
state = match future.poll_unpin(cx) {
418438
Poll::Ready(result) => {
419-
ParquetOpenState::BuildStream(Box::new(result?))
439+
ParquetOpenState::PruneWithBloomFilters(Box::new(result?))
420440
}
421441
Poll::Pending => {
422-
self.state = ParquetOpenState::PruneWithBloomFilters(future);
442+
self.state = ParquetOpenState::LoadBloomFilters(future);
423443
return Poll::Pending;
424444
}
425445
};
@@ -438,6 +458,7 @@ impl Future for ParquetOpenFuture {
438458
ParquetOpenState::PruneFile(_) => {}
439459
ParquetOpenState::PrepareFilters(_) => {}
440460
ParquetOpenState::PruneWithStatistics(_) => {}
461+
ParquetOpenState::PruneWithBloomFilters(_) => {}
441462
ParquetOpenState::BuildStream(_) => {}
442463
};
443464

@@ -862,8 +883,17 @@ impl FiltersPreparedParquetOpen {
862883
}
863884

864885
impl RowGroupsPrunedParquetOpen {
865-
/// Apply bloom filter pruning when it is enabled and a pruning predicate exists.
866-
async fn prune_bloom_filters(mut self) -> Result<Self> {
886+
/// Load bloom filters needed for pruning when enabled and a pruning predicate exists.
887+
async fn load_bloom_filters(mut self) -> Result<BloomFiltersLoadedParquetOpen> {
888+
let num_row_groups = self
889+
.prepared
890+
.loaded
891+
.reader_metadata
892+
.metadata()
893+
.num_row_groups();
894+
let mut row_group_bloom_filters =
895+
vec![BloomFilterStatistics::new(); num_row_groups];
896+
867897
if let Some(predicate) =
868898
self.prepared.pruning_predicate.as_ref().map(|p| p.as_ref())
869899
&& self.prepared.loaded.prepared.enable_bloom_filter
@@ -887,19 +917,86 @@ impl RowGroupsPrunedParquetOpen {
887917
mem::replace(&mut prepared.async_file_reader, replacement_reader),
888918
reader_metadata,
889919
);
890-
self.row_groups
891-
.prune_by_bloom_filters(
892-
&prepared.physical_file_schema,
893-
&mut builder,
894-
predicate,
895-
&prepared.file_metrics,
896-
)
897-
.await;
920+
let parquet_columns: Vec<(String, usize, Type)> = predicate
921+
.literal_columns()
922+
.into_iter()
923+
.filter_map(|column_name| {
924+
let parquet_schema = builder.parquet_schema();
925+
let (column_idx, _) = parquet_column(
926+
parquet_schema,
927+
&prepared.physical_file_schema,
928+
&column_name,
929+
)?;
930+
Some((
931+
column_name,
932+
column_idx,
933+
parquet_schema.column(column_idx).physical_type(),
934+
))
935+
})
936+
.collect();
937+
938+
for idx in self.row_groups.row_group_indexes() {
939+
let mut row_group_filters =
940+
BloomFilterStatistics::with_capacity(parquet_columns.len());
941+
for (column_name, column_idx, physical_type) in &parquet_columns {
942+
let bf: Sbbf = match builder
943+
.get_row_group_column_bloom_filter(idx, *column_idx)
944+
.await
945+
{
946+
Ok(Some(bf)) => bf,
947+
Ok(None) => continue,
948+
Err(e) => {
949+
debug!("Ignoring error reading bloom filter: {e}");
950+
prepared.file_metrics.predicate_evaluation_errors.add(1);
951+
continue;
952+
}
953+
};
954+
row_group_filters.insert(column_name, bf, *physical_type);
955+
}
956+
row_group_bloom_filters[idx] = row_group_filters;
957+
}
898958
}
899959

900-
Ok(self)
960+
Ok(BloomFiltersLoadedParquetOpen {
961+
prepared: self,
962+
row_group_bloom_filters,
963+
})
901964
}
965+
}
902966

967+
impl BloomFiltersLoadedParquetOpen {
968+
/// Apply bloom filter pruning using already loaded bloom filters.
969+
fn prune_bloom_filters(mut self) -> RowGroupsPrunedParquetOpen {
970+
let bloom_filter_eval_time = self
971+
.prepared
972+
.prepared
973+
.loaded
974+
.prepared
975+
.file_metrics
976+
.bloom_filter_eval_time
977+
.clone();
978+
let _timer_guard = bloom_filter_eval_time.timer();
979+
if let Some(predicate) = self
980+
.prepared
981+
.prepared
982+
.pruning_predicate
983+
.as_ref()
984+
.map(|p| p.as_ref())
985+
&& self.prepared.prepared.loaded.prepared.enable_bloom_filter
986+
&& !self.prepared.row_groups.is_empty()
987+
{
988+
self.prepared.row_groups.prune_by_bloom_filters(
989+
predicate,
990+
&self.prepared.prepared.loaded.prepared.file_metrics,
991+
&self.row_group_bloom_filters,
992+
);
993+
}
994+
995+
self.prepared
996+
}
997+
}
998+
999+
impl RowGroupsPrunedParquetOpen {
9031000
/// Build the final parquet stream once all pruning work is complete.
9041001
fn build_stream(self) -> Result<BoxStream<'static, Result<RecordBatch>>> {
9051002
let RowGroupsPrunedParquetOpen {

0 commit comments

Comments
 (0)