Skip to content

Commit 95b417e

Browse files
committed
Set distinct_count to 1 when filter narrows interval to single value
When a filter predicate collapses a column's interval to a single value (e.g. d_qoy = 1), the output has exactly 1 distinct value. Previously the original Parquet NDV was propagated, inflating GROUP BY output estimates for CTE self-join patterns like Q31.
1 parent bc2b36c commit 95b417e

2 files changed

Lines changed: 205 additions & 7 deletions

File tree

datafusion/physical-plan/src/filter.rs

Lines changed: 202 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -812,15 +812,23 @@ fn collect_new_statistics(
812812
};
813813
};
814814
let (lower, upper) = interval.into_bounds();
815-
let is_exact = !lower.is_null() && !upper.is_null() && lower == upper;
816-
let min_value = interval_bound_to_precision(lower, is_exact);
817-
let max_value = interval_bound_to_precision(upper, is_exact);
815+
let is_single_value =
816+
!lower.is_null() && !upper.is_null() && lower == upper;
817+
let min_value = interval_bound_to_precision(lower, is_single_value);
818+
let max_value = interval_bound_to_precision(upper, is_single_value);
819+
// When the interval collapses to a single value (equality
820+
// predicate), the column has exactly 1 distinct value
821+
let capped_distinct_count = if is_single_value {
822+
Precision::Exact(1)
823+
} else {
824+
distinct_count.to_inexact()
825+
};
818826
ColumnStatistics {
819827
null_count: input_column_stats[idx].null_count.to_inexact(),
820828
max_value,
821829
min_value,
822830
sum_value: Precision::Absent,
823-
distinct_count: distinct_count.to_inexact(),
831+
distinct_count: capped_distinct_count,
824832
byte_size: input_column_stats[idx].byte_size,
825833
}
826834
},
@@ -2246,4 +2254,194 @@ mod tests {
22462254

22472255
Ok(())
22482256
}
2257+
2258+
#[tokio::test]
2259+
async fn test_filter_statistics_equality_sets_ndv_to_one() -> Result<()> {
2260+
// a: min=1, max=100, ndv=80
2261+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2262+
let input = Arc::new(StatisticsExec::new(
2263+
Statistics {
2264+
num_rows: Precision::Inexact(100),
2265+
total_byte_size: Precision::Inexact(400),
2266+
column_statistics: vec![ColumnStatistics {
2267+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2268+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2269+
distinct_count: Precision::Inexact(80),
2270+
..Default::default()
2271+
}],
2272+
},
2273+
schema.clone(),
2274+
));
2275+
2276+
// a = 42 collapses interval to a single value
2277+
let predicate = Arc::new(BinaryExpr::new(
2278+
Arc::new(Column::new("a", 0)),
2279+
Operator::Eq,
2280+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2281+
));
2282+
let filter: Arc<dyn ExecutionPlan> =
2283+
Arc::new(FilterExec::try_new(predicate, input)?);
2284+
let statistics = filter.partition_statistics(None)?;
2285+
assert_eq!(
2286+
statistics.column_statistics[0].distinct_count,
2287+
Precision::Exact(1)
2288+
);
2289+
Ok(())
2290+
}
2291+
2292+
#[tokio::test]
2293+
async fn test_filter_statistics_or_equality_preserves_ndv() -> Result<()> {
2294+
// a: min=1, max=100, ndv=80
2295+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2296+
let input = Arc::new(StatisticsExec::new(
2297+
Statistics {
2298+
num_rows: Precision::Inexact(100),
2299+
total_byte_size: Precision::Inexact(400),
2300+
column_statistics: vec![ColumnStatistics {
2301+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2302+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2303+
distinct_count: Precision::Inexact(80),
2304+
..Default::default()
2305+
}],
2306+
},
2307+
schema.clone(),
2308+
));
2309+
2310+
// a = 42 OR a = 22: interval stays [1, 100], not a single value
2311+
let predicate = Arc::new(BinaryExpr::new(
2312+
Arc::new(BinaryExpr::new(
2313+
Arc::new(Column::new("a", 0)),
2314+
Operator::Eq,
2315+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2316+
)),
2317+
Operator::Or,
2318+
Arc::new(BinaryExpr::new(
2319+
Arc::new(Column::new("a", 0)),
2320+
Operator::Eq,
2321+
Arc::new(Literal::new(ScalarValue::Int32(Some(22)))),
2322+
)),
2323+
));
2324+
let filter: Arc<dyn ExecutionPlan> =
2325+
Arc::new(FilterExec::try_new(predicate, input)?);
2326+
let statistics = filter.partition_statistics(None)?;
2327+
assert_eq!(
2328+
statistics.column_statistics[0].distinct_count,
2329+
Precision::Inexact(80)
2330+
);
2331+
Ok(())
2332+
}
2333+
2334+
#[tokio::test]
2335+
async fn test_filter_statistics_and_equality_ndv() -> Result<()> {
2336+
// a: min=1, max=100, ndv=80
2337+
// b: min=1, max=50, ndv=40
2338+
// c: min=1, max=200, ndv=150
2339+
let schema = Schema::new(vec![
2340+
Field::new("a", DataType::Int32, false),
2341+
Field::new("b", DataType::Int32, false),
2342+
Field::new("c", DataType::Int32, false),
2343+
]);
2344+
let input = Arc::new(StatisticsExec::new(
2345+
Statistics {
2346+
num_rows: Precision::Inexact(100),
2347+
total_byte_size: Precision::Inexact(1200),
2348+
column_statistics: vec![
2349+
ColumnStatistics {
2350+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2351+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2352+
distinct_count: Precision::Inexact(80),
2353+
..Default::default()
2354+
},
2355+
ColumnStatistics {
2356+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2357+
max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
2358+
distinct_count: Precision::Inexact(40),
2359+
..Default::default()
2360+
},
2361+
ColumnStatistics {
2362+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2363+
max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
2364+
distinct_count: Precision::Inexact(150),
2365+
..Default::default()
2366+
},
2367+
],
2368+
},
2369+
schema.clone(),
2370+
));
2371+
2372+
// a = 42 AND b > 10 AND c = 7
2373+
let predicate = Arc::new(BinaryExpr::new(
2374+
Arc::new(BinaryExpr::new(
2375+
Arc::new(BinaryExpr::new(
2376+
Arc::new(Column::new("a", 0)),
2377+
Operator::Eq,
2378+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2379+
)),
2380+
Operator::And,
2381+
Arc::new(BinaryExpr::new(
2382+
Arc::new(Column::new("b", 1)),
2383+
Operator::Gt,
2384+
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2385+
)),
2386+
)),
2387+
Operator::And,
2388+
Arc::new(BinaryExpr::new(
2389+
Arc::new(Column::new("c", 2)),
2390+
Operator::Eq,
2391+
Arc::new(Literal::new(ScalarValue::Int32(Some(7)))),
2392+
)),
2393+
));
2394+
let filter: Arc<dyn ExecutionPlan> =
2395+
Arc::new(FilterExec::try_new(predicate, input)?);
2396+
let statistics = filter.partition_statistics(None)?;
2397+
// a = 42 collapses to single value
2398+
assert_eq!(
2399+
statistics.column_statistics[0].distinct_count,
2400+
Precision::Exact(1)
2401+
);
2402+
// b > 10 narrows to [11, 50] but doesn't collapse
2403+
assert_eq!(
2404+
statistics.column_statistics[1].distinct_count,
2405+
Precision::Inexact(40)
2406+
);
2407+
// c = 7 collapses to single value
2408+
assert_eq!(
2409+
statistics.column_statistics[2].distinct_count,
2410+
Precision::Exact(1)
2411+
);
2412+
Ok(())
2413+
}
2414+
2415+
#[tokio::test]
2416+
async fn test_filter_statistics_equality_absent_bounds_ndv() -> Result<()> {
2417+
// a: ndv=80, no min/max
2418+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2419+
let input = Arc::new(StatisticsExec::new(
2420+
Statistics {
2421+
num_rows: Precision::Inexact(100),
2422+
total_byte_size: Precision::Inexact(400),
2423+
column_statistics: vec![ColumnStatistics {
2424+
distinct_count: Precision::Inexact(80),
2425+
..Default::default()
2426+
}],
2427+
},
2428+
schema.clone(),
2429+
));
2430+
2431+
// a = 42: even without known bounds, interval analysis resolves
2432+
// the equality to [42, 42], so NDV is correctly set to Exact(1)
2433+
let predicate = Arc::new(BinaryExpr::new(
2434+
Arc::new(Column::new("a", 0)),
2435+
Operator::Eq,
2436+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2437+
));
2438+
let filter: Arc<dyn ExecutionPlan> =
2439+
Arc::new(FilterExec::try_new(predicate, input)?);
2440+
let statistics = filter.partition_statistics(None)?;
2441+
assert_eq!(
2442+
statistics.column_statistics[0].distinct_count,
2443+
Precision::Exact(1)
2444+
);
2445+
Ok(())
2446+
}
22492447
}

datafusion/sqllogictest/test_files/parquet_statistics.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ query TT
5959
EXPLAIN SELECT * FROM test_table WHERE column1 = 1;
6060
----
6161
physical_plan
62-
01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) ScanBytes=Inexact(40))]]
62+
01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]]
6363
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]]
6464
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]]
6565

@@ -84,7 +84,7 @@ query TT
8484
EXPLAIN SELECT * FROM test_table WHERE column1 = 1;
8585
----
8686
physical_plan
87-
01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) ScanBytes=Inexact(40))]]
87+
01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]]
8888
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]]
8989
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]]
9090

@@ -109,7 +109,7 @@ query TT
109109
EXPLAIN SELECT * FROM test_table WHERE column1 = 1;
110110
----
111111
physical_plan
112-
01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)))]]
112+
01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Distinct=Exact(1))]]
113113
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]]
114114
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]]
115115

0 commit comments

Comments
 (0)