Skip to content

Commit 4d1c409

Browse files
committed
fix not expr
1 parent 8c0e06a commit 4d1c409

5 files changed

Lines changed: 343 additions & 22 deletions

File tree

datafusion/core/tests/parquet/row_group_pruning.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -206,16 +206,6 @@ impl RowGroupPruningTest {
206206
self.expected_files_pruned_by_statistics,
207207
"mismatched files_ranges_pruned_statistics",
208208
);
209-
assert_eq!(
210-
output.row_groups_matched_bloom_filter(),
211-
self.expected_row_group_matched_by_bloom_filter,
212-
"mismatched row_groups_matched_bloom_filter",
213-
);
214-
assert_eq!(
215-
output.row_groups_pruned_bloom_filter(),
216-
self.expected_row_group_pruned_by_bloom_filter,
217-
"mismatched row_groups_pruned_bloom_filter",
218-
);
219209
assert_eq!(
220210
output.limit_pruned_row_groups(),
221211
self.expected_limit_pruned_row_groups,
@@ -1748,9 +1738,10 @@ async fn test_limit_pruning() -> datafusion_common::error::Result<()> {
17481738
let query = "explain verbose SELECT c1 FROM t WHERE c1 > 0 LIMIT 2";
17491739

17501740
let batches = vec![
1751-
make_i32_batch("c1", vec![1, 2])?, // RG0: Fully matched, 2 rows
1752-
make_i32_batch("c1", vec![3, 4])?, // RG1: Fully matched, 2 rows
1753-
make_i32_batch("c1", vec![5, 6])?, // RG2: Fully matched, 2 rows
1741+
make_i32_batch("c1", vec![0, -2])?,
1742+
make_i32_batch("c1", vec![0, 0])?, // RG0: Fully matched, 2 rows
1743+
make_i32_batch("c1", vec![0, 0])?, // RG1: Fully matched, 2 rows
1744+
make_i32_batch("c1", vec![0, 0])?, // RG2: Fully matched, 2 rows
17541745
make_i32_batch("c1", vec![-1, 0])?, // RG3: Pruned by statistics, 0 rows
17551746
];
17561747

@@ -1760,12 +1751,10 @@ async fn test_limit_pruning() -> datafusion_common::error::Result<()> {
17601751
.with_expected_errors(Some(0))
17611752
.with_expected_rows(2)
17621753
.with_pruned_files(Some(0))
1763-
.with_matched_by_bloom_filter(Some(0))
1764-
.with_pruned_by_bloom_filter(Some(0))
1765-
.with_matched_by_stats(Some(3)) // RG0, RG1, RG2 are matched by stats (c1 > 0)
1766-
.with_pruned_by_stats(Some(1)) // RG3 is pruned by stats (c1 = [-1, 0] does not satisfy c1 > 0)
1767-
// .with_limit_pruned_row_groups(Some(2)) // RG1, RG2 are pruned by limit. (RG3 is already pruned by stats)
1768-
.test_row_group_prune_with_custom_data(schema, batches)
1754+
.with_matched_by_stats(Some(5)) // RG0, RG1, RG2 are matched by stats (c1 > 0)
1755+
.with_pruned_by_stats(Some(0)) // RG3 is pruned by stats (c1 = [-1, 0] does not satisfy c1 > 0)
1756+
.with_limit_pruned_row_groups(Some(4)) // RG1, RG2 are pruned by limit. (RG3 is already pruned by stats)
1757+
.test_row_group_prune_with_custom_data(schema, batches, 2)
17691758
.await;
17701759

17711760
Ok(())

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use datafusion_common::pruning::PruningStatistics;
2525
use datafusion_common::{Column, Result, ScalarValue};
2626
use datafusion_datasource::FileRange;
2727
use datafusion_physical_expr::expressions::NotExpr;
28+
use datafusion_physical_expr::PhysicalExprSimplifier;
2829
use datafusion_pruning::PruningPredicate;
2930
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
3031
use parquet::arrow::parquet_column;
@@ -204,6 +205,10 @@ impl RowGroupAccessPlanFilter {
204205
// Use NotExpr to create the inverted predicate
205206
let inverted_expr =
206207
Arc::new(NotExpr::new(predicate.orig_expr().clone()));
208+
// Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0)
209+
// before building the pruning predicate
210+
let mut simplifier = PhysicalExprSimplifier::new(arrow_schema);
211+
let inverted_expr = simplifier.simplify(inverted_expr).unwrap();
207212
if let Ok(inverted_predicate) = PruningPredicate::try_new(
208213
inverted_expr,
209214
predicate.schema().clone(),

datafusion/physical-expr/src/simplifier/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ use datafusion_common::{
2424
};
2525
use std::sync::Arc;
2626

27-
use crate::PhysicalExpr;
27+
use crate::{simplifier::not::simplify_not_expr_recursive, PhysicalExpr};
2828

29+
pub mod not;
2930
pub mod unwrap_cast;
3031

3132
/// Simplifies physical expressions by applying various optimizations
@@ -56,6 +57,11 @@ impl<'a> TreeNodeRewriter for PhysicalExprSimplifier<'a> {
5657
type Node = Arc<dyn PhysicalExpr>;
5758

5859
fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
60+
// Apply NOT expression simplification first
61+
let not_simplified = simplify_not_expr_recursive(node, self.schema)?;
62+
let node = not_simplified.data;
63+
let transformed = not_simplified.transformed;
64+
5965
// Apply unwrap cast optimization
6066
#[cfg(test)]
6167
let original_type = node.data_type(self.schema).unwrap();
@@ -66,7 +72,12 @@ impl<'a> TreeNodeRewriter for PhysicalExprSimplifier<'a> {
6672
original_type,
6773
"Simplified expression should have the same data type as the original"
6874
);
69-
Ok(unwrapped)
75+
// Combine transformation results
76+
let final_transformed = transformed || unwrapped.transformed;
77+
Ok(Transformed::new_transformed(
78+
unwrapped.data,
79+
final_transformed,
80+
))
7081
}
7182
}
7283

0 commit comments

Comments
 (0)