Skip to content

Commit 7e90f52

Browse files
authored
Refactor parquet row filter setup (#22191)
## Which issue does this PR close? N/A, follow-up to review comments from #21637. ## Rationale for this change PR #21637 added logic to skip row filters and page-index pruning for row groups that are fully matched by row-group statistics. The resulting opener code had a few local closures and manually tracked `Option<RowFilter>` state, which made the scan setup harder to follow. This also restores the public `PagePruningAccessPlanFilter::prune_plan_with_page_index` return type to `ParquetAccessPlan`. The extra fully-matched page count is still available internally for metrics without changing the public API. ## What changes are included in this PR? - Extract row filter construction and reuse into `RowFilterGenerator`. - Extract access-plan preparation into a helper function. - Extract push decoder builder setup into a helper function and config struct. - Add an internal `PagePruningResult` and `prune_plan_with_page_index_and_metrics` for opener metrics. - Keep the public `prune_plan_with_page_index` API returning `ParquetAccessPlan`. ## Are these changes tested? Yes. ```bash cargo fmt --all cargo fmt --all --check cargo test -p datafusion-datasource-parquet --lib fully_matched cargo test -p datafusion-datasource-parquet --lib test_page_pruning_predicate_respects_enable_page_index cargo clippy --all-targets --all-features -- -D warnings ``` ## Are there any user-facing changes? No user-facing behavior changes. This preserves the existing public `prune_plan_with_page_index` return type.
1 parent 66f82af commit 7e90f52

2 files changed

Lines changed: 218 additions & 116 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 173 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
2020
use crate::access_plan::PreparedAccessPlan;
2121
use crate::page_filter::PagePruningAccessPlanFilter;
22-
use crate::row_filter::build_projection_read_plan;
22+
use crate::row_filter::{self, ParquetReadPlan, build_projection_read_plan};
2323
use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter};
2424
use crate::{
2525
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
26-
apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
26+
apply_file_schema_type_coercions, coerce_int96_to_resolution,
2727
};
2828
use arrow::array::{RecordBatch, RecordBatchOptions};
2929
use arrow::datatypes::DataType;
@@ -76,7 +76,9 @@ use parquet::arrow::parquet_column;
7676
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
7777
use parquet::basic::Type;
7878
use parquet::bloom_filter::Sbbf;
79-
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
79+
use parquet::file::metadata::{
80+
PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
81+
};
8082

8183
/// Stateless Parquet morselizer implementation.
8284
///
@@ -1076,41 +1078,6 @@ impl RowGroupsPrunedParquetOpen {
10761078
let file_metadata = Arc::clone(reader_metadata.metadata());
10771079
let rg_metadata = file_metadata.row_groups();
10781080

1079-
// Filter pushdown: evaluate predicates during scan.
1080-
// Keep the predicate around so we can rebuild RowFilter per decoder run
1081-
// when fully matched row groups split the scan into multiple decoders.
1082-
let pushdown_predicate = prepared
1083-
.pushdown_filters
1084-
.then_some(prepared.predicate.clone())
1085-
.flatten();
1086-
1087-
let try_build_row_filter =
1088-
|predicate: &Arc<dyn PhysicalExpr>| -> Option<RowFilter> {
1089-
match row_filter::build_row_filter(
1090-
predicate,
1091-
&prepared.physical_file_schema,
1092-
file_metadata.as_ref(),
1093-
prepared.reorder_predicates,
1094-
&prepared.file_metrics,
1095-
) {
1096-
Ok(Some(filter)) => Some(filter),
1097-
Ok(None) => None,
1098-
Err(e) => {
1099-
debug!(
1100-
"Ignoring error building row filter for '{predicate:?}': {e}"
1101-
);
1102-
None
1103-
}
1104-
}
1105-
};
1106-
1107-
// Build the first RowFilter eagerly; it will be reused for the first
1108-
// filtered decoder run and rebuilt from pushdown_predicate for any
1109-
// additional filtered runs.
1110-
let mut first_row_filter =
1111-
pushdown_predicate.as_ref().and_then(&try_build_row_filter);
1112-
let has_row_filter = first_row_filter.is_some();
1113-
11141081
// Prune by limit if limit is set and limit order is not sensitive
11151082
if let (Some(limit), false) = (prepared.limit, prepared.preserve_order) {
11161083
row_groups.prune_by_limit(limit, rg_metadata, &prepared.file_metrics);
@@ -1128,104 +1095,84 @@ impl RowGroupsPrunedParquetOpen {
11281095
&& !access_plan.is_empty()
11291096
&& let Some(page_pruning_predicate) = page_pruning_predicate
11301097
{
1131-
let (page_pruned_access_plan, pages_skipped_by_fully_matched) =
1132-
page_pruning_predicate.prune_plan_with_page_index(
1098+
let page_pruning_result = page_pruning_predicate
1099+
.prune_plan_with_page_index_and_metrics(
11331100
access_plan,
11341101
&prepared.physical_file_schema,
11351102
reader_metadata.parquet_schema(),
11361103
file_metadata.as_ref(),
11371104
&prepared.file_metrics,
11381105
);
1139-
access_plan = page_pruned_access_plan;
1106+
access_plan = page_pruning_result.access_plan;
11401107
ParquetFileMetrics::add_page_index_pages_skipped_by_fully_matched(
11411108
&prepared.metrics,
11421109
prepared.partition_index,
11431110
&prepared.file_name,
1144-
pages_skipped_by_fully_matched,
1111+
page_pruning_result.pages_skipped_by_fully_matched,
11451112
);
11461113
}
11471114

1148-
// Prepare access plans (extract row groups and row selection).
1149-
let prepare_access_plan =
1150-
|plan: ParquetAccessPlan| -> Result<PreparedAccessPlan> {
1151-
let mut prepared_access_plan = plan.prepare(rg_metadata)?;
1152-
if prepared.reverse_row_groups {
1153-
prepared_access_plan =
1154-
prepared_access_plan.reverse(file_metadata.as_ref())?;
1155-
}
1156-
Ok(prepared_access_plan)
1157-
};
1158-
11591115
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
11601116
let read_plan = build_projection_read_plan(
11611117
prepared.projection.expr_iter(),
11621118
&prepared.physical_file_schema,
11631119
reader_metadata.parquet_schema(),
11641120
);
11651121

1166-
// Split into consecutive runs of row groups that share the same filter
1167-
// requirement. Fully matched row groups skip the RowFilter; others need it.
1168-
// Reverse the run order for reverse scans so the combined decoder stream
1169-
// preserves the requested global row group order.
1170-
let mut runs = access_plan.split_runs(has_row_filter);
1171-
if prepared.reverse_row_groups {
1172-
runs.reverse();
1173-
}
1174-
let run_count = runs.len();
1175-
let decoder_limit = prepared.limit.filter(|_| run_count == 1);
1176-
let remaining_limit = prepared.limit.filter(|_| run_count > 1);
1177-
1178-
// Helper: configure a decoder builder with shared options from
1179-
// the prepared plan.
1180-
let build_decoder = |prepared_access_plan: PreparedAccessPlan,
1181-
metadata: ArrowReaderMetadata|
1182-
-> Result<ParquetPushDecoderBuilder> {
1183-
let mut builder = ParquetPushDecoderBuilder::new_with_metadata(metadata)
1184-
.with_projection(read_plan.projection_mask.clone())
1185-
.with_batch_size(prepared.batch_size)
1186-
.with_metrics(arrow_reader_metrics.clone());
1187-
if prepared.force_filter_selections {
1188-
builder =
1189-
builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
1190-
}
1191-
if let Some(row_selection) = prepared_access_plan.row_selection {
1192-
builder = builder.with_row_selection(row_selection);
1122+
let (decoder, pending_decoders, remaining_limit) = {
1123+
let mut row_filter_generator =
1124+
RowFilterGenerator::new(&prepared, file_metadata.as_ref());
1125+
1126+
// Split into consecutive runs of row groups that share the same filter
1127+
// requirement. Fully matched row groups skip the RowFilter; others need it.
1128+
// Reverse the run order for reverse scans so the combined decoder stream
1129+
// preserves the requested global row group order.
1130+
let mut runs = access_plan.split_runs(row_filter_generator.has_row_filter());
1131+
if prepared.reverse_row_groups {
1132+
runs.reverse();
11931133
}
1194-
builder = builder.with_row_groups(prepared_access_plan.row_group_indexes);
1195-
if let Some(limit) = decoder_limit {
1196-
builder = builder.with_limit(limit);
1197-
}
1198-
Ok(builder)
1199-
};
1134+
let run_count = runs.len();
1135+
let decoder_limit = prepared.limit.filter(|_| run_count == 1);
1136+
let remaining_limit = prepared.limit.filter(|_| run_count > 1);
1137+
1138+
let decoder_config = DecoderBuilderConfig {
1139+
read_plan: &read_plan,
1140+
batch_size: prepared.batch_size,
1141+
arrow_reader_metrics: &arrow_reader_metrics,
1142+
force_filter_selections: prepared.force_filter_selections,
1143+
decoder_limit,
1144+
};
12001145

1201-
// Build a decoder per run.
1202-
let mut decoders = VecDeque::with_capacity(runs.len());
1203-
for run in runs {
1204-
let prepared_access_plan = prepare_access_plan(run.access_plan)?;
1205-
let mut builder =
1206-
build_decoder(prepared_access_plan, reader_metadata.clone())?;
1207-
if run.needs_filter {
1208-
// Reuse pre-built filter for the first filtered run,
1209-
// rebuild from the predicate for subsequent ones.
1210-
let row_filter = first_row_filter.take().or_else(|| {
1211-
pushdown_predicate.as_ref().and_then(&try_build_row_filter)
1212-
});
1213-
if let Some(row_filter) = row_filter {
1214-
builder = builder.with_row_filter(row_filter);
1215-
}
1216-
if let Some(max_predicate_cache_size) = prepared.max_predicate_cache_size
1217-
{
1218-
builder =
1219-
builder.with_max_predicate_cache_size(max_predicate_cache_size);
1146+
// Build a decoder per run.
1147+
let mut decoders = VecDeque::with_capacity(runs.len());
1148+
for run in runs {
1149+
let prepared_access_plan = prepare_access_plan(
1150+
run.access_plan,
1151+
rg_metadata,
1152+
file_metadata.as_ref(),
1153+
prepared.reverse_row_groups,
1154+
)?;
1155+
let mut builder =
1156+
decoder_config.build(prepared_access_plan, reader_metadata.clone());
1157+
if run.needs_filter {
1158+
if let Some(row_filter) = row_filter_generator.next_filter() {
1159+
builder = builder.with_row_filter(row_filter);
1160+
}
1161+
if let Some(max_predicate_cache_size) =
1162+
prepared.max_predicate_cache_size
1163+
{
1164+
builder = builder
1165+
.with_max_predicate_cache_size(max_predicate_cache_size);
1166+
}
12201167
}
1168+
decoders.push_back(builder.build()?);
12211169
}
1222-
decoders.push_back(builder.build()?);
1223-
}
12241170

1225-
let decoder = decoders
1226-
.pop_front()
1227-
.expect("at least one decoder must be created");
1228-
let pending_decoders = decoders;
1171+
let decoder = decoders
1172+
.pop_front()
1173+
.expect("at least one decoder must be created");
1174+
(decoder, decoders, remaining_limit)
1175+
};
12291176

12301177
let predicate_cache_inner_records =
12311178
prepared.file_metrics.predicate_cache_inner_records.clone();
@@ -1280,6 +1227,121 @@ impl RowGroupsPrunedParquetOpen {
12801227
}
12811228
}
12821229

1230+
/// Builds row filters for decoder runs.
1231+
///
1232+
/// A [`RowFilter`] must be owned by a decoder, so scans split across multiple
1233+
/// decoder runs need a fresh filter for each run that evaluates row predicates.
1234+
struct RowFilterGenerator<'a> {
1235+
predicate: Option<&'a Arc<dyn PhysicalExpr>>,
1236+
physical_file_schema: &'a SchemaRef,
1237+
file_metadata: &'a ParquetMetaData,
1238+
reorder_predicates: bool,
1239+
file_metrics: &'a ParquetFileMetrics,
1240+
first_row_filter: Option<RowFilter>,
1241+
}
1242+
1243+
impl<'a> RowFilterGenerator<'a> {
1244+
fn new(
1245+
prepared: &'a PreparedParquetOpen,
1246+
file_metadata: &'a ParquetMetaData,
1247+
) -> Self {
1248+
let predicate = prepared
1249+
.pushdown_filters
1250+
.then_some(prepared.predicate.as_ref())
1251+
.flatten();
1252+
1253+
let mut generator = Self {
1254+
predicate,
1255+
physical_file_schema: &prepared.physical_file_schema,
1256+
file_metadata,
1257+
reorder_predicates: prepared.reorder_predicates,
1258+
file_metrics: &prepared.file_metrics,
1259+
first_row_filter: None,
1260+
};
1261+
generator.first_row_filter = generator.build_row_filter();
1262+
generator
1263+
}
1264+
1265+
fn has_row_filter(&self) -> bool {
1266+
self.first_row_filter.is_some()
1267+
}
1268+
1269+
fn next_filter(&mut self) -> Option<RowFilter> {
1270+
self.first_row_filter
1271+
.take()
1272+
.or_else(|| self.build_row_filter())
1273+
}
1274+
1275+
fn build_row_filter(&self) -> Option<RowFilter> {
1276+
let predicate = self.predicate?;
1277+
match row_filter::build_row_filter(
1278+
predicate,
1279+
self.physical_file_schema,
1280+
self.file_metadata,
1281+
self.reorder_predicates,
1282+
self.file_metrics,
1283+
) {
1284+
Ok(Some(filter)) => Some(filter),
1285+
Ok(None) => None,
1286+
Err(e) => {
1287+
debug!("Ignoring error building row filter for '{predicate:?}': {e}");
1288+
None
1289+
}
1290+
}
1291+
}
1292+
}
1293+
1294+
fn prepare_access_plan(
1295+
plan: ParquetAccessPlan,
1296+
rg_metadata: &[RowGroupMetaData],
1297+
file_metadata: &ParquetMetaData,
1298+
reverse_row_groups: bool,
1299+
) -> Result<PreparedAccessPlan> {
1300+
let mut prepared_access_plan = plan.prepare(rg_metadata)?;
1301+
if reverse_row_groups {
1302+
prepared_access_plan = prepared_access_plan.reverse(file_metadata)?;
1303+
}
1304+
Ok(prepared_access_plan)
1305+
}
1306+
1307+
/// State shared while building [`ParquetPushDecoder`]s for one file scan.
1308+
///
1309+
/// A scan can be split into multiple decoder runs when row groups have
1310+
/// different filtering requirements. This config holds the options that apply
1311+
/// to every [`ParquetPushDecoderBuilder`], while each run supplies its own
1312+
/// [`PreparedAccessPlan`] and optional row filter.
1313+
struct DecoderBuilderConfig<'a> {
1314+
read_plan: &'a ParquetReadPlan,
1315+
batch_size: usize,
1316+
arrow_reader_metrics: &'a ArrowReaderMetrics,
1317+
force_filter_selections: bool,
1318+
decoder_limit: Option<usize>,
1319+
}
1320+
1321+
impl DecoderBuilderConfig<'_> {
1322+
fn build(
1323+
&self,
1324+
prepared_access_plan: PreparedAccessPlan,
1325+
metadata: ArrowReaderMetadata,
1326+
) -> ParquetPushDecoderBuilder {
1327+
let mut builder = ParquetPushDecoderBuilder::new_with_metadata(metadata)
1328+
.with_projection(self.read_plan.projection_mask.clone())
1329+
.with_batch_size(self.batch_size)
1330+
.with_metrics(self.arrow_reader_metrics.clone());
1331+
if self.force_filter_selections {
1332+
builder = builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
1333+
}
1334+
if let Some(row_selection) = prepared_access_plan.row_selection {
1335+
builder = builder.with_row_selection(row_selection);
1336+
}
1337+
builder = builder.with_row_groups(prepared_access_plan.row_group_indexes);
1338+
if let Some(limit) = self.decoder_limit {
1339+
builder = builder.with_limit(limit);
1340+
}
1341+
builder
1342+
}
1343+
}
1344+
12831345
/// State for a stream that decodes a single Parquet file using a push-based decoder.
12841346
///
12851347
/// The [`transition`](Self::transition) method drives the decoder in a loop: it requests

0 commit comments

Comments
 (0)