Skip to content

Commit 2fffad2

Browse files
adriangbclaude
andcommitted
feat(parquet-datasource): always accept pushable filters, run them post-scan
`ParquetSource::try_pushdown_filters` always returns the per-filter `Yes` / `No` discriminant from `can_expr_be_pushed_down_with_schemas`, regardless of the `pushdown_filters` config. The parent `FilterExec` is always removed for pushable filters, and the scan owns the predicate. The opener routes the predicate to the post-scan filter when `pushdown_filters = false`, in addition to the rejected-conjunct path that already exists for `pushdown_filters = true`: - `pushdown_filters = true` → row-filterable conjuncts via the parquet `RowFilter`; any rejected conjuncts via the post-scan filter (the correctness fix from the previous commit). - `pushdown_filters = false` → the whole predicate runs as a post-scan filter on decoded batches (behaviorally identical to a `FilterExec`). The `pushdown_filters` config keeps its meaning ("build a parquet `RowFilter`"); doc comments updated. Plan / test consequences (all results unchanged, plan shape and metrics change): - The `FilterExec` no longer appears above a `DataSourceExec` for pushable parquet filters. The predicate appears as `predicate=…` on the `DataSourceExec`. Parquet `.slt` files are regenerated to reflect this (clickbench, push_down_filter_parquet, projection_pushdown, parquet*, etc.). Spurious whitespace churn from `--complete` was reverted. - Opener / integration tests that asserted "row group not pruned ⇒ all rows returned" (e.g. `a = 1` over `[1, 2, 3]` returning 3 rows) are updated to reflect the matching-row count, since the scan now applies the predicate row-level via the post-scan filter. - `FilterExec: id@0 = 1` assertions in DataFrame / view tests become `predicate=id@0 = 1` on the `DataSourceExec`. - Insta inline snapshots in `parquet.rs` and `explain_analyze.rs` are re-accepted (`output_rows=8` → `output_rows=5` plus `post_scan_rows_pruned=3`, multi-line plans collapse where the `FilterExec`/`RepartitionExec` chain is gone). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent f174c94 commit 2fffad2

20 files changed

Lines changed: 277 additions & 451 deletions

datafusion/core/src/dataframe/parquet.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,14 @@ mod tests {
162162
.select_columns(&["bool_col", "int_col"])?;
163163

164164
let plan = df.explain(false, false)?.collect().await?;
165-
// Filters all the way to Parquet
165+
// Filters all the way to Parquet. The parquet scan now accepts the
166+
// pushable filter unconditionally so the `FilterExec` is removed —
167+
// the predicate appears as `predicate=` on the `DataSourceExec`.
166168
let formatted = pretty::pretty_format_batches(&plan)?.to_string();
167-
assert!(formatted.contains("FilterExec: id@0 = 1"), "{formatted}");
169+
assert!(
170+
formatted.contains("predicate=id@0 = 1"),
171+
"expected predicate=id@0 = 1 in {formatted}"
172+
);
168173

169174
Ok(())
170175
}

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 30 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -751,17 +751,12 @@ mod tests {
751751
.await
752752
.unwrap();
753753

754-
insta::assert_snapshot!(batches_to_sort_string(&read),@r"
755-
+-----+----+----+
756-
| c1 | c3 | c2 |
757-
+-----+----+----+
758-
| | | |
759-
| | 10 | 1 |
760-
| | 20 | |
761-
| | 20 | 2 |
762-
| Foo | 10 | |
763-
| bar | | |
764-
+-----+----+----+
754+
insta::assert_snapshot!(batches_to_sort_string(&read),@"
755+
+----+----+----+
756+
| c1 | c3 | c2 |
757+
+----+----+----+
758+
| | 20 | 2 |
759+
+----+----+----+
765760
");
766761
}
767762

@@ -896,7 +891,9 @@ mod tests {
896891
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
897892
assert_eq!(rt.batches.unwrap().len(), 0);
898893

899-
// Predicate should prune no row groups
894+
// Predicate should prune no row groups. The scan now applies the
895+
// predicate post-scan (since `pushdown_filters` is off in this test),
896+
// so only the matching row survives.
900897
let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string()))));
901898
let rt = RoundTrip::new()
902899
.with_predicate(filter)
@@ -913,7 +910,7 @@ mod tests {
913910
.iter()
914911
.map(|b| b.num_rows())
915912
.sum::<usize>();
916-
assert_eq!(read, 2, "Expected 2 rows to match the predicate");
913+
assert_eq!(read, 1, "Expected 1 row to match the predicate");
917914
}
918915

919916
#[tokio::test]
@@ -937,7 +934,9 @@ mod tests {
937934
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
938935
assert_eq!(rt.batches.unwrap().len(), 0);
939936

940-
// Predicate should prune no row groups
937+
// Predicate should prune no row groups. The scan now applies the
938+
// predicate post-scan (since `pushdown_filters` is off in this test),
939+
// so only the matching row survives.
941940
let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1))));
942941
let rt = RoundTrip::new()
943942
.with_predicate(filter)
@@ -953,7 +952,7 @@ mod tests {
953952
.iter()
954953
.map(|b| b.num_rows())
955954
.sum::<usize>();
956-
assert_eq!(read, 2, "Expected 2 rows to match the predicate");
955+
assert_eq!(read, 1, "Expected 1 row to match the predicate");
957956
}
958957

959958
#[tokio::test]
@@ -1051,17 +1050,12 @@ mod tests {
10511050
// In a real query where this predicate was pushed down from a filter stage instead of created directly in the `DataSourceExec`,
10521051
// the filter stage would be preserved as a separate execution plan stage so the actual query results would be as expected.
10531052

1054-
insta::assert_snapshot!(batches_to_sort_string(&read),@r"
1055-
+-----+----+
1056-
| c1 | c2 |
1057-
+-----+----+
1058-
| | |
1059-
| | |
1060-
| | 1 |
1061-
| | 2 |
1062-
| Foo | |
1063-
| bar | |
1064-
+-----+----+
1053+
insta::assert_snapshot!(batches_to_sort_string(&read),@"
1054+
+----+----+
1055+
| c1 | c2 |
1056+
+----+----+
1057+
| | 1 |
1058+
+----+----+
10651059
");
10661060
}
10671061

@@ -1147,23 +1141,14 @@ mod tests {
11471141
.round_trip(vec![batch1, batch2, batch3, batch4])
11481142
.await;
11491143

1150-
insta::assert_snapshot!(batches_to_sort_string(&rt.batches.unwrap()), @r"
1151-
+------+----+
1152-
| c1 | c2 |
1153-
+------+----+
1154-
| | 1 |
1155-
| | 2 |
1156-
| Bar | |
1157-
| Bar | 2 |
1158-
| Bar | 2 |
1159-
| Bar2 | |
1160-
| Bar3 | |
1161-
| Foo | |
1162-
| Foo | 1 |
1163-
| Foo | 1 |
1164-
| Foo2 | |
1165-
| Foo3 | |
1166-
+------+----+
1144+
insta::assert_snapshot!(batches_to_sort_string(&rt.batches.unwrap()), @"
1145+
+-----+----+
1146+
| c1 | c2 |
1147+
+-----+----+
1148+
| | 1 |
1149+
| Foo | 1 |
1150+
| Foo | 1 |
1151+
+-----+----+
11671152
");
11681153
let metrics = rt.parquet_exec.metrics().unwrap();
11691154

@@ -1232,11 +1217,10 @@ mod tests {
12321217
.await
12331218
.unwrap();
12341219

1235-
insta::assert_snapshot!(batches_to_sort_string(&read),@r"
1220+
insta::assert_snapshot!(batches_to_sort_string(&read),@"
12361221
+-----+----+
12371222
| c1 | c2 |
12381223
+-----+----+
1239-
| | 2 |
12401224
| Foo | 1 |
12411225
| bar | |
12421226
+-----+----+
@@ -1756,12 +1740,11 @@ mod tests {
17561740

17571741
let metrics = rt.parquet_exec.metrics().unwrap();
17581742

1759-
assert_snapshot!(batches_to_sort_string(&rt.batches.unwrap()),@r"
1743+
assert_snapshot!(batches_to_sort_string(&rt.batches.unwrap()),@"
17601744
+-----+
17611745
| int |
17621746
+-----+
17631747
| 4 |
1764-
| 5 |
17651748
+-----+
17661749
");
17671750
let (page_index_rows_pruned, page_index_rows_matched) =

datafusion/core/src/datasource/view_test.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,11 +322,16 @@ mod tests {
322322

323323
let plan = df.explain(false, false)?.collect().await?;
324324

325-
// Filters all the way to Parquet
325+
// Filters all the way to Parquet. The parquet scan now accepts the
326+
// pushable filter unconditionally so the `FilterExec` is removed —
327+
// the predicate appears as `predicate=` on the `DataSourceExec`.
326328
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
327329
.unwrap()
328330
.to_string();
329-
assert!(formatted.contains("FilterExec: id@0 = 1"));
331+
assert!(
332+
formatted.contains("predicate=id@0 = 1"),
333+
"expected predicate=id@0 = 1 in {formatted}"
334+
);
330335
Ok(())
331336
}
332337

datafusion/core/tests/parquet/page_pruning.rs

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -113,30 +113,20 @@ async fn page_index_filter_one_col() {
113113
let filter = col("month").eq(lit(1_i32));
114114

115115
let batches = get_filter_results(&state, filter.clone(), false).await;
116-
// `month = 1` from the page index should create below RowSelection
117-
// vec.push(RowSelector::select(312));
118-
// vec.push(RowSelector::skip(3330));
119-
// vec.push(RowSelector::select(339));
120-
// vec.push(RowSelector::skip(3319));
121-
// total 651 row
122-
assert_eq!(batches[0].num_rows(), 651);
116+
// `month = 1` from the page index narrows IO to ~651 candidate rows
117+
// (RowSelector skip/select pattern), and the in-scan post-scan filter
118+
// then rejects the 31 page-aligned rows that don't actually match.
119+
assert_eq!(batches[0].num_rows(), 620);
123120

124121
let batches = get_filter_results(&state, filter, true).await;
125122
assert_eq!(batches[0].num_rows(), 620);
126123

127124
// 2. create filter month == 1 or month == 2;
128125
let filter = col("month").eq(lit(1_i32)).or(col("month").eq(lit(2_i32)));
129126
let batches = get_filter_results(&state, filter.clone(), false).await;
130-
// `month = 1` or `month = 2` from the page index should create below RowSelection
131-
// vec.push(RowSelector::select(312));
132-
// vec.push(RowSelector::skip(900));
133-
// vec.push(RowSelector::select(312));
134-
// vec.push(RowSelector::skip(2118));
135-
// vec.push(RowSelector::select(339));
136-
// vec.push(RowSelector::skip(873));
137-
// vec.push(RowSelector::select(318));
138-
// vec.push(RowSelector::skip(2128));
139-
assert_eq!(batches[0].num_rows(), 1281);
127+
// Page-index pruning leaves a 1281-row candidate set; the scan applies
128+
// the predicate (RowFilter or post-scan) to land on 1180 matches.
129+
assert_eq!(batches[0].num_rows(), 1180);
140130

141131
let batches = get_filter_results(&state, filter, true).await;
142132
assert_eq!(batches[0].num_rows(), 1180);
@@ -154,23 +144,18 @@ async fn page_index_filter_one_col() {
154144
// 4.create filter 0 < month < 2 ;
155145
let filter = col("month").gt(lit(0_i32)).and(col("month").lt(lit(2_i32)));
156146
let batches = get_filter_results(&state, filter.clone(), false).await;
157-
// should same with `month = 1`
158-
assert_eq!(batches[0].num_rows(), 651);
147+
// Same matching set as `month = 1`.
148+
assert_eq!(batches[0].num_rows(), 620);
159149
let batches = get_filter_results(&state, filter, true).await;
160150
assert_eq!(batches[0].num_rows(), 620);
161151

162152
// 5.create filter date_string_col == "01/01/09"`;
163153
// Note this test doesn't apply type coercion so the literal must match the actual view type
164154
let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8view("01/01/09")));
165155
let batches = get_filter_results(&state, filter.clone(), false).await;
166-
assert_eq!(batches[0].num_rows(), 14);
167-
168-
// there should only two pages match the filter
169-
// min max
170-
// page-20 0 01/01/09 01/02/09
171-
// page-21 0 01/01/09 01/01/09
172-
// each 7 rows
173-
assert_eq!(batches[0].num_rows(), 14);
156+
// Page index narrows to two pages of 7 rows each (14 rows); the scan
157+
// then rejects the 4 page-aligned rows that don't actually match.
158+
assert_eq!(batches[0].num_rows(), 10);
174159
let batches = get_filter_results(&state, filter, true).await;
175160
assert_eq!(batches[0].num_rows(), 10);
176161
}
@@ -183,11 +168,9 @@ async fn page_index_filter_multi_col() {
183168
// create filter month == 1 and year = 2009;
184169
let filter = col("month").eq(lit(1_i32)).and(col("year").eq(lit(2009)));
185170
let batches = get_filter_results(&state, filter.clone(), false).await;
186-
// `year = 2009` from the page index should create below RowSelection
187-
// vec.push(RowSelector::select(3663));
188-
// vec.push(RowSelector::skip(3642));
189-
// combine with `month = 1` total 333 row
190-
assert_eq!(batches[0].num_rows(), 333);
171+
// Page index narrows IO to ~333 candidate rows; the scan then rejects
172+
// the page-aligned rows that don't actually match, landing on 310.
173+
assert_eq!(batches[0].num_rows(), 310);
191174
let batches = get_filter_results(&state, filter, true).await;
192175
assert_eq!(batches[0].num_rows(), 310);
193176

@@ -197,15 +180,15 @@ async fn page_index_filter_multi_col() {
197180
.eq(lit(1_i32))
198181
.and(col("year").eq(lit(2009)).or(col("id").eq(lit(1))));
199182
let batches = get_filter_results(&state, filter.clone(), false).await;
200-
assert_eq!(batches[0].num_rows(), 651);
183+
assert_eq!(batches[0].num_rows(), 310);
201184
let batches = get_filter_results(&state, filter, true).await;
202185
assert_eq!(batches[0].num_rows(), 310);
203186

204187
// create filter (year = 2009 or id = 1)
205188
// this filter use two columns will not push down
206189
let filter = col("year").eq(lit(2009)).or(col("id").eq(lit(1)));
207190
let batches = get_filter_results(&state, filter.clone(), false).await;
208-
assert_eq!(batches[0].num_rows(), 7300);
191+
assert_eq!(batches[0].num_rows(), 3650);
209192
let batches = get_filter_results(&state, filter, true).await;
210193
assert_eq!(batches[0].num_rows(), 3650);
211194

@@ -218,7 +201,7 @@ async fn page_index_filter_multi_col() {
218201
.and(col("id").eq(lit(1)))
219202
.or(col("year").eq(lit(2010)));
220203
let batches = get_filter_results(&state, filter.clone(), false).await;
221-
assert_eq!(batches[0].num_rows(), 7300);
204+
assert_eq!(batches[0].num_rows(), 3651);
222205
let batches = get_filter_results(&state, filter, true).await;
223206
assert_eq!(batches[0].num_rows(), 3651);
224207
}

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -877,8 +877,12 @@ async fn parquet_explain_analyze() {
877877
.unwrap()
878878
.to_string();
879879

880-
// should contain aggregated stats
881-
assert_contains!(&formatted, "output_rows=8");
880+
// should contain aggregated stats. The scan now applies the predicate
881+
// post-scan (the `FilterExec` above it is removed), so `output_rows` is
882+
// the matching row count (5) rather than the decoded row count (8).
883+
// The 3 rejected rows show up as `post_scan_rows_pruned=3`.
884+
assert_contains!(&formatted, "output_rows=5");
885+
assert_contains!(&formatted, "post_scan_rows_pruned=3");
882886
assert_contains!(
883887
&formatted,
884888
"row_groups_pruned_bloom_filter=1 total \u{2192} 1 matched"
@@ -1010,14 +1014,10 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> {
10101014

10111015
assert_snapshot!(
10121016
actual,
1013-
@r"
1017+
@"
10141018
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
10151019
RecursiveQueryExec: name=number_series, is_distinct=false
1016-
CoalescePartitionsExec
1017-
ProjectionExec: expr=[id@0 as id, 1 as level]
1018-
FilterExec: id@0 = 1
1019-
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
1020-
DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]
1020+
DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id, 1 as level], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]
10211021
CoalescePartitionsExec
10221022
ProjectionExec: expr=[id@0 + 1 as id, level@1 + 1 as level]
10231023
FilterExec: id@0 < 10

0 commit comments

Comments
 (0)