Skip to content

Commit 0613f1a

Browse files
committed
Set distinct_count to 1 when filter narrows interval to single value
When a filter predicate collapses a column's interval to a single value (e.g. d_qoy = 1), the output has exactly 1 distinct value. Previously the original Parquet NDV was propagated, inflating GROUP BY output estimates for CTE self-join patterns like Q31.
1 parent 9885f4b commit 0613f1a

2 files changed

Lines changed: 205 additions & 7 deletions

File tree

datafusion/physical-plan/src/filter.rs

Lines changed: 202 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -839,15 +839,23 @@ fn collect_new_statistics(
839839
};
840840
};
841841
let (lower, upper) = interval.into_bounds();
842-
let is_exact = !lower.is_null() && !upper.is_null() && lower == upper;
843-
let min_value = interval_bound_to_precision(lower, is_exact);
844-
let max_value = interval_bound_to_precision(upper, is_exact);
842+
let is_single_value =
843+
!lower.is_null() && !upper.is_null() && lower == upper;
844+
let min_value = interval_bound_to_precision(lower, is_single_value);
845+
let max_value = interval_bound_to_precision(upper, is_single_value);
846+
// When the interval collapses to a single value (equality
847+
// predicate), the column has exactly 1 distinct value
848+
let capped_distinct_count = if is_single_value {
849+
Precision::Exact(1)
850+
} else {
851+
distinct_count.to_inexact()
852+
};
845853
ColumnStatistics {
846854
null_count: input_column_stats[idx].null_count.to_inexact(),
847855
max_value,
848856
min_value,
849857
sum_value: Precision::Absent,
850-
distinct_count: distinct_count.to_inexact(),
858+
distinct_count: capped_distinct_count,
851859
byte_size: input_column_stats[idx].byte_size,
852860
}
853861
},
@@ -2274,4 +2282,194 @@ mod tests {
22742282

22752283
Ok(())
22762284
}
2285+
2286+
#[tokio::test]
2287+
async fn test_filter_statistics_equality_sets_ndv_to_one() -> Result<()> {
2288+
// a: min=1, max=100, ndv=80
2289+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2290+
let input = Arc::new(StatisticsExec::new(
2291+
Statistics {
2292+
num_rows: Precision::Inexact(100),
2293+
total_byte_size: Precision::Inexact(400),
2294+
column_statistics: vec![ColumnStatistics {
2295+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2296+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2297+
distinct_count: Precision::Inexact(80),
2298+
..Default::default()
2299+
}],
2300+
},
2301+
schema.clone(),
2302+
));
2303+
2304+
// a = 42 collapses interval to a single value
2305+
let predicate = Arc::new(BinaryExpr::new(
2306+
Arc::new(Column::new("a", 0)),
2307+
Operator::Eq,
2308+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2309+
));
2310+
let filter: Arc<dyn ExecutionPlan> =
2311+
Arc::new(FilterExec::try_new(predicate, input)?);
2312+
let statistics = filter.partition_statistics(None)?;
2313+
assert_eq!(
2314+
statistics.column_statistics[0].distinct_count,
2315+
Precision::Exact(1)
2316+
);
2317+
Ok(())
2318+
}
2319+
2320+
#[tokio::test]
2321+
async fn test_filter_statistics_or_equality_preserves_ndv() -> Result<()> {
2322+
// a: min=1, max=100, ndv=80
2323+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2324+
let input = Arc::new(StatisticsExec::new(
2325+
Statistics {
2326+
num_rows: Precision::Inexact(100),
2327+
total_byte_size: Precision::Inexact(400),
2328+
column_statistics: vec![ColumnStatistics {
2329+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2330+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2331+
distinct_count: Precision::Inexact(80),
2332+
..Default::default()
2333+
}],
2334+
},
2335+
schema.clone(),
2336+
));
2337+
2338+
// a = 42 OR a = 22: interval stays [1, 100], not a single value
2339+
let predicate = Arc::new(BinaryExpr::new(
2340+
Arc::new(BinaryExpr::new(
2341+
Arc::new(Column::new("a", 0)),
2342+
Operator::Eq,
2343+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2344+
)),
2345+
Operator::Or,
2346+
Arc::new(BinaryExpr::new(
2347+
Arc::new(Column::new("a", 0)),
2348+
Operator::Eq,
2349+
Arc::new(Literal::new(ScalarValue::Int32(Some(22)))),
2350+
)),
2351+
));
2352+
let filter: Arc<dyn ExecutionPlan> =
2353+
Arc::new(FilterExec::try_new(predicate, input)?);
2354+
let statistics = filter.partition_statistics(None)?;
2355+
assert_eq!(
2356+
statistics.column_statistics[0].distinct_count,
2357+
Precision::Inexact(80)
2358+
);
2359+
Ok(())
2360+
}
2361+
2362+
#[tokio::test]
2363+
async fn test_filter_statistics_and_equality_ndv() -> Result<()> {
2364+
// a: min=1, max=100, ndv=80
2365+
// b: min=1, max=50, ndv=40
2366+
// c: min=1, max=200, ndv=150
2367+
let schema = Schema::new(vec![
2368+
Field::new("a", DataType::Int32, false),
2369+
Field::new("b", DataType::Int32, false),
2370+
Field::new("c", DataType::Int32, false),
2371+
]);
2372+
let input = Arc::new(StatisticsExec::new(
2373+
Statistics {
2374+
num_rows: Precision::Inexact(100),
2375+
total_byte_size: Precision::Inexact(1200),
2376+
column_statistics: vec![
2377+
ColumnStatistics {
2378+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2379+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2380+
distinct_count: Precision::Inexact(80),
2381+
..Default::default()
2382+
},
2383+
ColumnStatistics {
2384+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2385+
max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
2386+
distinct_count: Precision::Inexact(40),
2387+
..Default::default()
2388+
},
2389+
ColumnStatistics {
2390+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2391+
max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
2392+
distinct_count: Precision::Inexact(150),
2393+
..Default::default()
2394+
},
2395+
],
2396+
},
2397+
schema.clone(),
2398+
));
2399+
2400+
// a = 42 AND b > 10 AND c = 7
2401+
let predicate = Arc::new(BinaryExpr::new(
2402+
Arc::new(BinaryExpr::new(
2403+
Arc::new(BinaryExpr::new(
2404+
Arc::new(Column::new("a", 0)),
2405+
Operator::Eq,
2406+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2407+
)),
2408+
Operator::And,
2409+
Arc::new(BinaryExpr::new(
2410+
Arc::new(Column::new("b", 1)),
2411+
Operator::Gt,
2412+
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2413+
)),
2414+
)),
2415+
Operator::And,
2416+
Arc::new(BinaryExpr::new(
2417+
Arc::new(Column::new("c", 2)),
2418+
Operator::Eq,
2419+
Arc::new(Literal::new(ScalarValue::Int32(Some(7)))),
2420+
)),
2421+
));
2422+
let filter: Arc<dyn ExecutionPlan> =
2423+
Arc::new(FilterExec::try_new(predicate, input)?);
2424+
let statistics = filter.partition_statistics(None)?;
2425+
// a = 42 collapses to single value
2426+
assert_eq!(
2427+
statistics.column_statistics[0].distinct_count,
2428+
Precision::Exact(1)
2429+
);
2430+
// b > 10 narrows to [11, 50] but doesn't collapse
2431+
assert_eq!(
2432+
statistics.column_statistics[1].distinct_count,
2433+
Precision::Inexact(40)
2434+
);
2435+
// c = 7 collapses to single value
2436+
assert_eq!(
2437+
statistics.column_statistics[2].distinct_count,
2438+
Precision::Exact(1)
2439+
);
2440+
Ok(())
2441+
}
2442+
2443+
#[tokio::test]
2444+
async fn test_filter_statistics_equality_absent_bounds_ndv() -> Result<()> {
2445+
// a: ndv=80, no min/max
2446+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2447+
let input = Arc::new(StatisticsExec::new(
2448+
Statistics {
2449+
num_rows: Precision::Inexact(100),
2450+
total_byte_size: Precision::Inexact(400),
2451+
column_statistics: vec![ColumnStatistics {
2452+
distinct_count: Precision::Inexact(80),
2453+
..Default::default()
2454+
}],
2455+
},
2456+
schema.clone(),
2457+
));
2458+
2459+
// a = 42: even without known bounds, interval analysis resolves
2460+
// the equality to [42, 42], so NDV is correctly set to Exact(1)
2461+
let predicate = Arc::new(BinaryExpr::new(
2462+
Arc::new(Column::new("a", 0)),
2463+
Operator::Eq,
2464+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
2465+
));
2466+
let filter: Arc<dyn ExecutionPlan> =
2467+
Arc::new(FilterExec::try_new(predicate, input)?);
2468+
let statistics = filter.partition_statistics(None)?;
2469+
assert_eq!(
2470+
statistics.column_statistics[0].distinct_count,
2471+
Precision::Exact(1)
2472+
);
2473+
Ok(())
2474+
}
22772475
}

datafusion/sqllogictest/test_files/parquet_statistics.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ query TT
5959
EXPLAIN SELECT * FROM test_table WHERE column1 = 1;
6060
----
6161
physical_plan
62-
01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) ScanBytes=Inexact(40))]]
62+
01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]]
6363
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]]
6464
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]]
6565

@@ -84,7 +84,7 @@ query TT
8484
EXPLAIN SELECT * FROM test_table WHERE column1 = 1;
8585
----
8686
physical_plan
87-
01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) ScanBytes=Inexact(40))]]
87+
01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]]
8888
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]]
8989
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]]
9090

@@ -110,7 +110,7 @@ query TT
110110
EXPLAIN SELECT * FROM test_table WHERE column1 = 1;
111111
----
112112
physical_plan
113-
01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)))]]
113+
01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Distinct=Exact(1))]]
114114
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]]
115115
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]]
116116

0 commit comments

Comments
 (0)