Skip to content

Commit fcd521c

Browse files
committed
Fix fully matched row groups with null counts
1 parent 8f033e4 commit fcd521c

1 file changed

Lines changed: 94 additions & 6 deletions

File tree

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ use arrow::datatypes::Schema;
2424
use datafusion_common::pruning::PruningStatistics;
2525
use datafusion_common::{Column, Result, ScalarValue};
2626
use datafusion_datasource::FileRange;
27-
use datafusion_physical_expr::PhysicalExprSimplifier;
28-
use datafusion_physical_expr::expressions::NotExpr;
27+
use datafusion_expr::Operator;
28+
use datafusion_physical_expr::expressions::{BinaryExpr, IsNullExpr, NotExpr};
29+
use datafusion_physical_expr::utils::collect_columns;
30+
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprSimplifier};
2931
use datafusion_pruning::PruningPredicate;
3032
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
3133
use parquet::basic::Type;
@@ -272,6 +274,7 @@ impl RowGroupAccessPlanFilter {
272274
parquet_schema,
273275
row_group_metadatas,
274276
arrow_schema,
277+
missing_null_counts_as_zero: true,
275278
};
276279

277280
// try to prune the row groups in a single call
@@ -327,10 +330,33 @@ impl RowGroupAccessPlanFilter {
327330
return;
328331
}
329332

330-
// Use NotExpr to create the inverted predicate
331-
let inverted_expr = Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr())));
333+
let mut inverted_expr: Arc<dyn PhysicalExpr> =
334+
Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr())));
332335

333-
// Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0)
336+
// Rows where the predicate evaluates to NULL do not pass the filter.
337+
// Include NULL checks in the inverted expression so a row group is only
338+
// considered fully matched when every referenced column is known non-null.
339+
// This is conservative for null-accepting predicates, but fully matched
340+
// row groups must not have false positives.
341+
let mut columns = collect_columns(predicate.orig_expr())
342+
.into_iter()
343+
.filter(|column| arrow_schema.field(column.index()).is_nullable())
344+
.collect::<Vec<_>>();
345+
columns.sort_by(|a, b| {
346+
a.index()
347+
.cmp(&b.index())
348+
.then_with(|| a.name().cmp(b.name()))
349+
});
350+
351+
for column in columns {
352+
inverted_expr = Arc::new(BinaryExpr::new(
353+
inverted_expr,
354+
Operator::Or,
355+
Arc::new(IsNullExpr::new(Arc::new(column))),
356+
));
357+
}
358+
359+
// Simplify the inverted expression (e.g., NOT(c1 = 0) -> c1 != 0)
334360
// before building the pruning predicate
335361
let simplifier = PhysicalExprSimplifier::new(arrow_schema);
336362
let Ok(inverted_expr) = simplifier.simplify(inverted_expr) else {
@@ -350,6 +376,7 @@ impl RowGroupAccessPlanFilter {
350376
.map(|&i| &groups[i])
351377
.collect::<Vec<_>>(),
352378
arrow_schema,
379+
missing_null_counts_as_zero: false,
353380
};
354381

355382
let Ok(inverted_values) = inverted_predicate.prune(&inverted_pruning_stats)
@@ -582,6 +609,7 @@ struct RowGroupPruningStatistics<'a> {
582609
parquet_schema: &'a SchemaDescriptor,
583610
row_group_metadatas: Vec<&'a RowGroupMetaData>,
584611
arrow_schema: &'a Schema,
612+
missing_null_counts_as_zero: bool,
585613
}
586614

587615
impl<'a> RowGroupPruningStatistics<'a> {
@@ -598,7 +626,8 @@ impl<'a> RowGroupPruningStatistics<'a> {
598626
&column.name,
599627
self.arrow_schema,
600628
self.parquet_schema,
601-
)?)
629+
)?
630+
.with_missing_null_counts_as_zero(self.missing_null_counts_as_zero))
602631
}
603632
}
604633

@@ -767,6 +796,65 @@ mod tests {
767796
assert_pruned(row_groups, ExpectedPruning::Some(vec![1]))
768797
}
769798

799+
#[test]
800+
fn row_group_fully_matched_requires_known_non_null_predicate_columns() {
801+
use datafusion_expr::{col, lit};
802+
803+
let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
804+
let expr = logical2physical(&col("c1").gt(lit(15)), &schema);
805+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
806+
807+
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
808+
let schema_descr = get_test_schema_descr(vec![field]);
809+
810+
// All three row groups have non-null values in the predicate range,
811+
// so none are pruned. Only the second row group can be proven fully
812+
// matched because it is the only one with a known zero null count.
813+
let rg_with_null = get_row_group_meta_data(
814+
&schema_descr,
815+
vec![ParquetStatistics::int32(
816+
Some(16),
817+
Some(20),
818+
None,
819+
Some(1),
820+
false,
821+
)],
822+
);
823+
let rg_without_null = get_row_group_meta_data(
824+
&schema_descr,
825+
vec![ParquetStatistics::int32(
826+
Some(16),
827+
Some(20),
828+
None,
829+
Some(0),
830+
false,
831+
)],
832+
);
833+
let rg_unknown_null_count = get_row_group_meta_data(
834+
&schema_descr,
835+
vec![ParquetStatistics::int32(
836+
Some(16),
837+
Some(20),
838+
None,
839+
None,
840+
false,
841+
)],
842+
);
843+
844+
let metrics = parquet_file_metrics();
845+
let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
846+
row_groups.prune_by_statistics(
847+
&schema,
848+
&schema_descr,
849+
&[rg_with_null, rg_without_null, rg_unknown_null_count],
850+
&pruning_predicate,
851+
&metrics,
852+
);
853+
854+
assert_eq!(row_groups.access_plan.row_group_indexes(), vec![0, 1, 2]);
855+
assert_eq!(row_groups.is_fully_matched(), &vec![false, true, false]);
856+
}
857+
770858
#[test]
771859
fn row_group_pruning_predicate_missing_stats() {
772860
use datafusion_expr::{col, lit};

0 commit comments

Comments
 (0)