Skip to content

Commit f174c94

Browse files
adriangbclaude
andcommitted
fix(parquet-datasource): never drop conjuncts the RowFilter cannot place
`build_row_filter` (and its `RowFilterGenerator` wrapper) silently dropped conjuncts that `FilterCandidateBuilder::build` rejected (`Ok(None)` was `.flatten()`-ed away) and swallowed whole-build errors. By the time `build_row_filter` runs, `ParquetSource::try_pushdown_filters` has already accepted the filter and the parent `FilterExec` has been removed, so those dropped conjuncts were never applied anywhere — wrong results. Most reproducible trigger: the per-file expr adapter rewrites a predicate that was pushable at *table schema* time into something the `PushdownChecker` rejects at *physical file schema* time (schema evolution / coercion / whole-struct references introduced by the rewrite). Surface the rejected conjuncts instead of dropping them: - `build_row_filter` now returns `Result<(Option<RowFilter>, Vec<Arc<dyn PhysicalExpr>>)>`. The second element is the conjuncts it could not place. Bench / in-file test call sites updated. - `RowFilterGenerator` exposes `rejected_conjuncts()`. On a whole-file build error it routes every conjunct through that list, so an error no longer relaxes the predicate. - `DecoderProjection::build` grows a `post_scan_conjuncts` parameter and a `post_scan_filter: Option<PostScanFilter>` field. When non-empty it widens the decoder mask (over the user projection ∪ post-scan filter columns), rebases the conjuncts onto the stream schema, and returns a `PostScanFilter` that the stream applies to every decoded batch with SQL `WHERE` semantics (mirroring `FilterExec`'s `batch_filter`). - `PushDecoderStreamState` carries the optional `PostScanFilter` and applies it in the `DecodeResult::Data` arm, skipping empty batches. - The decoder-local LIMIT is unsafe with a post-scan filter (the decoder would short-circuit before the filter rejects enough rows), so the opener routes the limit to `remaining_limit` whenever a post-scan filter is present. - New `post_scan_rows_pruned` / `post_scan_rows_matched` counters and `post_scan_filter_eval_time` `Time` on `ParquetFileMetrics`, mirroring the existing `pushdown_rows_*` / `row_pushdown_eval_time` so `EXPLAIN ANALYZE` keeps surfacing filter cost. Two regression tests: - `build_row_filter_surfaces_rejected_struct_conjunct` (`row_filter.rs`) asserts the new API contract directly — the rejected struct conjunct is returned, not dropped. - `rejected_struct_conjunct_runs_post_scan_not_dropped` (`opener/mod.rs`) is end-to-end: with `pushdown_filters=true` and a `s IS NOT NULL` predicate over a struct column where one row is NULL, `main` returns 3 rows (conjunct silently dropped, predicate relaxed); after this fix it correctly returns 2. The `pushdown_filters = false` path is intentionally unchanged in this commit — `try_pushdown_filters` still leaves the `FilterExec` above the scan in that case. Always-accepting filters and removing the `FilterExec` unconditionally is a separate behaviour change in a follow-up commit. `push_down_filter_parquet.slt` updated for the new `post_scan_rows_*` metric lines on `EXPLAIN ANALYZE` output. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e1c1a45 commit f174c94

8 files changed

Lines changed: 504 additions & 117 deletions

File tree

datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ fn scan_with_predicate(
115115
let file_metrics = ParquetFileMetrics::new(0, &path.display().to_string(), &metrics);
116116

117117
let builder = if pushdown {
118-
if let Some(row_filter) =
119-
build_row_filter(predicate, file_schema, &metadata, false, &file_metrics)?
120-
{
118+
let (row_filter, _rejected) =
119+
build_row_filter(predicate, file_schema, &metadata, false, &file_metrics)?;
120+
if let Some(row_filter) = row_filter {
121121
builder.with_row_filter(row_filter)
122122
} else {
123123
builder

datafusion/datasource-parquet/benches/parquet_struct_filter_pushdown.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,9 @@ fn scan(
210210

211211
let mut filter_applied = false;
212212
let builder = if pushdown {
213-
if let Some(row_filter) =
214-
build_row_filter(predicate, file_schema, &metadata, false, &file_metrics)?
215-
{
213+
let (row_filter, _rejected) =
214+
build_row_filter(predicate, file_schema, &metadata, false, &file_metrics)?;
215+
if let Some(row_filter) = row_filter {
216216
filter_applied = true;
217217
builder.with_row_filter(row_filter)
218218
} else {

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ pub struct ParquetFileMetrics {
5959
pub pushdown_rows_matched: Count,
6060
/// Total time spent evaluating row-level pushdown filters
6161
pub row_pushdown_eval_time: Time,
62+
/// Total rows filtered out by the in-scan post-scan filter
63+
/// (predicate conjuncts that could not be applied as a parquet
64+
/// `RowFilter` and were instead evaluated on decoded batches).
65+
pub post_scan_rows_pruned: Count,
66+
/// Total rows that passed the in-scan post-scan filter.
67+
pub post_scan_rows_matched: Count,
68+
/// Total time spent evaluating the in-scan post-scan filter.
69+
pub post_scan_filter_eval_time: Time,
6270
/// Total time spent evaluating row group-level statistics filters
6371
pub statistics_eval_time: Time,
6472
/// Total time spent evaluating row group Bloom Filters
@@ -167,6 +175,18 @@ impl ParquetFileMetrics {
167175
let row_pushdown_eval_time = MetricBuilder::new(metrics)
168176
.with_new_label("filename", filename.to_string())
169177
.subset_time("row_pushdown_eval_time", partition);
178+
179+
let post_scan_rows_pruned = MetricBuilder::new(metrics)
180+
.with_new_label("filename", filename.to_string())
181+
.with_category(MetricCategory::Rows)
182+
.counter("post_scan_rows_pruned", partition);
183+
let post_scan_rows_matched = MetricBuilder::new(metrics)
184+
.with_new_label("filename", filename.to_string())
185+
.with_category(MetricCategory::Rows)
186+
.counter("post_scan_rows_matched", partition);
187+
let post_scan_filter_eval_time = MetricBuilder::new(metrics)
188+
.with_new_label("filename", filename.to_string())
189+
.subset_time("post_scan_filter_eval_time", partition);
170190
let statistics_eval_time = MetricBuilder::new(metrics)
171191
.with_new_label("filename", filename.to_string())
172192
.subset_time("statistics_eval_time", partition);
@@ -202,6 +222,9 @@ impl ParquetFileMetrics {
202222
pushdown_rows_pruned,
203223
pushdown_rows_matched,
204224
row_pushdown_eval_time,
225+
post_scan_rows_pruned,
226+
post_scan_rows_matched,
227+
post_scan_filter_eval_time,
205228
page_index_rows_pruned,
206229
page_index_pages_pruned,
207230
statistics_eval_time,

datafusion/datasource-parquet/src/opener/mod.rs

Lines changed: 126 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,31 +1149,38 @@ impl RowGroupsPrunedParquetOpen {
11491149

11501150
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
11511151

1152-
// Build the decoder projection (mask + projector + replace_schema) in
1153-
// a single call. Encapsulating it behind `DecoderProjection` keeps
1154-
// the opener's orchestration body focused on filter / decoder /
1155-
// stream wiring, and gives a clean seam for the in-scan post-scan
1156-
// filter introduced in a later change.
1152+
// Build the `RowFilter` first so we can collect any conjuncts it
1153+
// could not place. Those rejected conjuncts must still be applied
1154+
// (`try_pushdown_filters` already removed the parent `FilterExec`),
1155+
// so they fall through to the in-scan post-scan filter.
1156+
let pushdown_predicate = prepared
1157+
.pushdown_filters
1158+
.then_some(prepared.predicate.as_ref())
1159+
.flatten();
1160+
let mut row_filter_generator = RowFilterGenerator::new(
1161+
pushdown_predicate,
1162+
&prepared.physical_file_schema,
1163+
file_metadata.as_ref(),
1164+
prepared.reorder_predicates,
1165+
&prepared.file_metrics,
1166+
);
1167+
let post_scan_conjuncts: Vec<Arc<dyn PhysicalExpr>> = if prepared.pushdown_filters
1168+
{
1169+
row_filter_generator.rejected_conjuncts().to_vec()
1170+
} else {
1171+
Vec::new()
1172+
};
1173+
11571174
let decoder_projection = DecoderProjection::build(
11581175
&prepared.projection,
1176+
&post_scan_conjuncts,
11591177
&prepared.physical_file_schema,
11601178
reader_metadata.parquet_schema(),
11611179
&prepared.output_schema,
1180+
&prepared.file_metrics,
11621181
)?;
11631182

11641183
let (decoder, pending_decoders, remaining_limit) = {
1165-
let pushdown_predicate = prepared
1166-
.pushdown_filters
1167-
.then_some(prepared.predicate.as_ref())
1168-
.flatten();
1169-
let mut row_filter_generator = RowFilterGenerator::new(
1170-
pushdown_predicate,
1171-
&prepared.physical_file_schema,
1172-
file_metadata.as_ref(),
1173-
prepared.reorder_predicates,
1174-
&prepared.file_metrics,
1175-
);
1176-
11771184
// Split into consecutive runs of row groups that share the same filter
11781185
// requirement. Fully matched row groups skip the RowFilter; others need it.
11791186
// Reverse the run order for reverse scans so the combined decoder stream
@@ -1183,8 +1190,14 @@ impl RowGroupsPrunedParquetOpen {
11831190
runs.reverse();
11841191
}
11851192
let run_count = runs.len();
1186-
let decoder_limit = prepared.limit.filter(|_| run_count == 1);
1187-
let remaining_limit = prepared.limit.filter(|_| run_count > 1);
1193+
// Decoder-local limits are only safe when no post-decode work can
1194+
// reject rows. A post-scan filter can — so when one is present
1195+
// the limit must be enforced at the stream level via
1196+
// `remaining_limit`.
1197+
let single_no_post_scan =
1198+
run_count == 1 && decoder_projection.post_scan_filter.is_none();
1199+
let decoder_limit = prepared.limit.filter(|_| single_no_post_scan);
1200+
let remaining_limit = prepared.limit.filter(|_| !single_no_post_scan);
11881201

11891202
let decoder_config = DecoderBuilderConfig {
11901203
projection_mask: &decoder_projection.projection_mask,
@@ -1229,6 +1242,7 @@ impl RowGroupsPrunedParquetOpen {
12291242
projection_mask: _,
12301243
projector,
12311244
replace_schema,
1245+
post_scan_filter,
12321246
} = decoder_projection;
12331247
let output_schema = Arc::clone(&prepared.output_schema);
12341248
let files_ranges_pruned_statistics =
@@ -1245,6 +1259,7 @@ impl RowGroupsPrunedParquetOpen {
12451259
predicate_cache_inner_records,
12461260
predicate_cache_records,
12471261
baseline_metrics: prepared.baseline_metrics,
1262+
post_scan_filter,
12481263
}
12491264
.into_stream();
12501265

@@ -2673,4 +2688,96 @@ mod test {
26732688
assert!(runs[2].needs_filter);
26742689
assert_eq!(runs[2].access_plan.row_group_indexes(), vec![3]);
26752690
}
2691+
2692+
/// End-to-end regression test for the "drop-on-floor" bug fixed by
2693+
/// `build_row_filter` now returning rejected conjuncts and the opener
2694+
/// routing them to the post-scan filter.
2695+
///
2696+
/// Setup: a parquet file with a struct column where some rows have a NULL
2697+
/// struct. Predicate `s IS NOT NULL` is set on the source with
2698+
/// `pushdown_filters = true`. `ParquetSource::try_pushdown_filters` would
2699+
/// have already removed the parent `FilterExec` (the conjunct is pushable
2700+
/// at table schema level). Inside `build_row_filter`,
2701+
/// `FilterCandidateBuilder::build` rejects the whole-struct reference as
2702+
/// non-primitive.
2703+
///
2704+
/// Before the fix the rejected conjunct was silently dropped, leaving the
2705+
/// scan with no `RowFilter` and no post-scan filter, so every row was
2706+
/// returned — i.e. the predicate was relaxed and the query returned wrong
2707+
/// results. After the fix the conjunct is surfaced and applied as a
2708+
/// post-scan filter, so only the rows with a non-null struct survive.
2709+
#[tokio::test]
2710+
async fn rejected_struct_conjunct_runs_post_scan_not_dropped() {
2711+
use arrow::array::{Int32Array, StringArray, StructArray};
2712+
use arrow::buffer::NullBuffer;
2713+
use arrow::datatypes::Fields;
2714+
2715+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
2716+
2717+
// Schema: id (Int32), s (Struct{value: Int32, label: Utf8}).
2718+
let struct_fields: Fields = vec![
2719+
Arc::new(Field::new("value", DataType::Int32, true)),
2720+
Arc::new(Field::new("label", DataType::Utf8, true)),
2721+
]
2722+
.into();
2723+
let schema = Arc::new(Schema::new(vec![
2724+
Field::new("id", DataType::Int32, false),
2725+
Field::new("s", DataType::Struct(struct_fields.clone()), true),
2726+
]));
2727+
2728+
// Data: rows 0 and 2 have a non-null struct, row 1 is null.
2729+
let batch = RecordBatch::try_new(
2730+
Arc::clone(&schema),
2731+
vec![
2732+
Arc::new(Int32Array::from(vec![1, 2, 3])),
2733+
Arc::new(StructArray::new(
2734+
struct_fields,
2735+
vec![
2736+
Arc::new(Int32Array::from(vec![Some(10), None, Some(30)])) as _,
2737+
Arc::new(StringArray::from(vec![Some("a"), None, Some("c")]))
2738+
as _,
2739+
],
2740+
Some(NullBuffer::from(vec![true, false, true])),
2741+
)),
2742+
],
2743+
)
2744+
.unwrap();
2745+
2746+
let data_size = write_parquet_batches(
2747+
Arc::clone(&store),
2748+
"rejected.parquet",
2749+
vec![batch],
2750+
None,
2751+
)
2752+
.await;
2753+
2754+
let file =
2755+
PartitionedFile::new("rejected.parquet".to_string(), data_size as u64);
2756+
2757+
// `s IS NOT NULL` references a whole struct, which `PushdownChecker`
2758+
// flags as non-primitive — `FilterCandidateBuilder::build` returns
2759+
// `Ok(None)` and the conjunct lands in `rejected`.
2760+
let predicate = logical2physical(&col("s").is_not_null(), &schema);
2761+
2762+
let morselizer = ParquetMorselizerBuilder::new()
2763+
.with_store(Arc::clone(&store))
2764+
.with_schema(Arc::clone(&schema))
2765+
.with_predicate(predicate)
2766+
// The RowFilter path: emulates the post-`try_pushdown_filters`
2767+
// state where the parent `FilterExec` has already been removed
2768+
// and the scan owns the conjunct.
2769+
.with_pushdown_filters(true)
2770+
.build();
2771+
2772+
let stream = open_file(&morselizer, file).await.unwrap();
2773+
let (_, rows) = count_batches_and_rows(stream).await;
2774+
2775+
// 2 rows have a non-null struct. Before the fix this returned 3
2776+
// (the conjunct was silently dropped).
2777+
assert_eq!(
2778+
rows, 2,
2779+
"expected 2 rows with non-null struct; the rejected conjunct must \
2780+
be applied post-scan, not silently dropped"
2781+
);
2782+
}
26762783
}

0 commit comments

Comments
 (0)