Skip to content

Commit bf0d992

Browse files
committed
refactor: Split Parquet BloomFilter CPU and IO into separate states
1 parent 6412c3a commit bf0d992

File tree

2 files changed

+212
-80
lines changed

2 files changed

+212
-80
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 120 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
use crate::page_filter::PagePruningAccessPlanFilter;
2121
use crate::row_filter::build_projection_read_plan;
22-
use crate::row_group_filter::RowGroupAccessPlanFilter;
22+
use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter};
2323
use crate::{
2424
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
2525
apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
@@ -70,7 +70,10 @@ use parquet::arrow::arrow_reader::{
7070
ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
7171
};
7272
use parquet::arrow::async_reader::AsyncFileReader;
73+
use parquet::arrow::parquet_column;
7374
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
75+
use parquet::basic::Type;
76+
use parquet::bloom_filter::Sbbf;
7477
use parquet::errors::ParquetError;
7578
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
7679

@@ -163,6 +166,9 @@ pub(super) struct ParquetOpener {
163166
/// PruneWithStatistics
164167
/// |
165168
/// v
169+
/// LoadBloomFilters
170+
/// |
171+
/// v
166172
/// PruneWithBloomFilters
167173
/// |
168174
/// v
@@ -196,10 +202,10 @@ enum ParquetOpenState {
196202
LoadPageIndex(BoxFuture<'static, Result<FiltersPreparedParquetOpen>>),
197203
/// Pruning Row Groups
198204
PruneWithStatistics(Box<FiltersPreparedParquetOpen>),
199-
/// Pruning with Bloom Filters
200-
///
201-
/// TODO: split state as this currently does both I/O and CPU work
202-
PruneWithBloomFilters(BoxFuture<'static, Result<RowGroupsPrunedParquetOpen>>),
205+
/// Loading bloom filters required for row-group pruning
206+
LoadBloomFilters(BoxFuture<'static, Result<BloomFiltersLoadedParquetOpen>>),
207+
/// Pruning with preloaded Bloom Filters
208+
PruneWithBloomFilters(Box<BloomFiltersLoadedParquetOpen>),
203209
/// Builds the final reader stream
204210
///
205211
/// TODO: split state as this currently does both I/O and CPU work.
@@ -273,6 +279,17 @@ struct RowGroupsPrunedParquetOpen {
273279
row_groups: RowGroupAccessPlanFilter,
274280
}
275281

282+
/// State of [`ParquetOpenState`]
283+
///
284+
/// Result of loading bloom filters needed for row-group pruning.
285+
struct BloomFiltersLoadedParquetOpen {
286+
prepared: RowGroupsPrunedParquetOpen,
287+
/// Bloom filters loaded for each row group that remains under consideration.
288+
///
289+
/// indexed by parquet row-group index
290+
row_group_bloom_filters: Vec<BloomFilterStatistics>,
291+
}
292+
276293
/// Implements state machine described in [`ParquetOpenState`]
277294
struct ParquetOpenFuture {
278295
state: ParquetOpenState,
@@ -354,13 +371,16 @@ impl ParquetOpenState {
354371
}
355372
ParquetOpenState::PruneWithStatistics(prepared) => {
356373
let prepared_row_groups = prepared.prune_row_groups()?;
357-
Ok(ParquetOpenState::PruneWithBloomFilters(
358-
prepared_row_groups.prune_bloom_filters().boxed(),
374+
Ok(ParquetOpenState::LoadBloomFilters(
375+
prepared_row_groups.load_bloom_filters().boxed(),
359376
))
360377
}
361-
ParquetOpenState::PruneWithBloomFilters(future) => {
362-
Ok(ParquetOpenState::PruneWithBloomFilters(future))
378+
ParquetOpenState::LoadBloomFilters(future) => {
379+
Ok(ParquetOpenState::LoadBloomFilters(future))
363380
}
381+
ParquetOpenState::PruneWithBloomFilters(loaded) => Ok(
382+
ParquetOpenState::BuildStream(Box::new(loaded.prune_bloom_filters())),
383+
),
364384
ParquetOpenState::BuildStream(prepared) => {
365385
Ok(ParquetOpenState::Ready(prepared.build_stream()?))
366386
}
@@ -413,13 +433,13 @@ impl Future for ParquetOpenFuture {
413433
}
414434
};
415435
}
416-
ParquetOpenState::PruneWithBloomFilters(mut future) => {
436+
ParquetOpenState::LoadBloomFilters(mut future) => {
417437
state = match future.poll_unpin(cx) {
418438
Poll::Ready(result) => {
419-
ParquetOpenState::BuildStream(Box::new(result?))
439+
ParquetOpenState::PruneWithBloomFilters(Box::new(result?))
420440
}
421441
Poll::Pending => {
422-
self.state = ParquetOpenState::PruneWithBloomFilters(future);
442+
self.state = ParquetOpenState::LoadBloomFilters(future);
423443
return Poll::Pending;
424444
}
425445
};
@@ -438,6 +458,7 @@ impl Future for ParquetOpenFuture {
438458
ParquetOpenState::PruneFile(_) => {}
439459
ParquetOpenState::PrepareFilters(_) => {}
440460
ParquetOpenState::PruneWithStatistics(_) => {}
461+
ParquetOpenState::PruneWithBloomFilters(_) => {}
441462
ParquetOpenState::BuildStream(_) => {}
442463
};
443464

@@ -862,8 +883,17 @@ impl FiltersPreparedParquetOpen {
862883
}
863884

864885
impl RowGroupsPrunedParquetOpen {
865-
/// Apply bloom filter pruning when it is enabled and a pruning predicate exists.
866-
async fn prune_bloom_filters(mut self) -> Result<Self> {
886+
/// Load bloom filters needed for pruning when enabled and a pruning predicate exists.
887+
async fn load_bloom_filters(mut self) -> Result<BloomFiltersLoadedParquetOpen> {
888+
let num_row_groups = self
889+
.prepared
890+
.loaded
891+
.reader_metadata
892+
.metadata()
893+
.num_row_groups();
894+
let mut row_group_bloom_filters =
895+
vec![BloomFilterStatistics::new(); num_row_groups];
896+
867897
if let Some(predicate) =
868898
self.prepared.pruning_predicate.as_ref().map(|p| p.as_ref())
869899
&& self.prepared.loaded.prepared.enable_bloom_filter
@@ -887,19 +917,86 @@ impl RowGroupsPrunedParquetOpen {
887917
mem::replace(&mut prepared.async_file_reader, replacement_reader),
888918
reader_metadata,
889919
);
890-
self.row_groups
891-
.prune_by_bloom_filters(
892-
&prepared.physical_file_schema,
893-
&mut builder,
894-
predicate,
895-
&prepared.file_metrics,
896-
)
897-
.await;
920+
let parquet_columns: Vec<(String, usize, Type)> = predicate
921+
.literal_columns()
922+
.into_iter()
923+
.filter_map(|column_name| {
924+
let parquet_schema = builder.parquet_schema();
925+
let (column_idx, _) = parquet_column(
926+
parquet_schema,
927+
&prepared.physical_file_schema,
928+
&column_name,
929+
)?;
930+
Some((
931+
column_name,
932+
column_idx,
933+
parquet_schema.column(column_idx).physical_type(),
934+
))
935+
})
936+
.collect();
937+
938+
for idx in self.row_groups.row_group_indexes() {
939+
let mut row_group_filters =
940+
BloomFilterStatistics::with_capacity(parquet_columns.len());
941+
for (column_name, column_idx, physical_type) in &parquet_columns {
942+
let bf: Sbbf = match builder
943+
.get_row_group_column_bloom_filter(idx, *column_idx)
944+
.await
945+
{
946+
Ok(Some(bf)) => bf,
947+
Ok(None) => continue,
948+
Err(e) => {
949+
debug!("Ignoring error reading bloom filter: {e}");
950+
prepared.file_metrics.predicate_evaluation_errors.add(1);
951+
continue;
952+
}
953+
};
954+
row_group_filters.insert(column_name, bf, *physical_type);
955+
}
956+
row_group_bloom_filters[idx] = row_group_filters;
957+
}
898958
}
899959

900-
Ok(self)
960+
Ok(BloomFiltersLoadedParquetOpen {
961+
prepared: self,
962+
row_group_bloom_filters,
963+
})
901964
}
965+
}
902966

967+
impl BloomFiltersLoadedParquetOpen {
968+
/// Apply bloom filter pruning using already loaded bloom filters.
969+
fn prune_bloom_filters(mut self) -> RowGroupsPrunedParquetOpen {
970+
let bloom_filter_eval_time = self
971+
.prepared
972+
.prepared
973+
.loaded
974+
.prepared
975+
.file_metrics
976+
.bloom_filter_eval_time
977+
.clone();
978+
let _timer_guard = bloom_filter_eval_time.timer();
979+
if let Some(predicate) = self
980+
.prepared
981+
.prepared
982+
.pruning_predicate
983+
.as_ref()
984+
.map(|p| p.as_ref())
985+
&& self.prepared.prepared.loaded.prepared.enable_bloom_filter
986+
&& !self.prepared.row_groups.is_empty()
987+
{
988+
self.prepared.row_groups.prune_by_bloom_filters(
989+
predicate,
990+
&self.prepared.prepared.loaded.prepared.file_metrics,
991+
&self.row_group_bloom_filters,
992+
);
993+
}
994+
995+
self.prepared
996+
}
997+
}
998+
999+
impl RowGroupsPrunedParquetOpen {
9031000
/// Build the final parquet stream once all pruning work is complete.
9041001
fn build_stream(self) -> Result<BoxStream<'static, Result<RecordBatch>>> {
9051002
let RowGroupsPrunedParquetOpen {

0 commit comments

Comments
 (0)