Skip to content

Commit 6cd7e83

Browse files
authored
feat: extend interval analysis support for temporal types (#21520)
## Which issue does this PR close? - Closes #21111 related with #21109 and its PR #21473 ## Rationale for this change Temporal types can actually support internal arithmetics and with this change internal arithmetics can now narrow down column boundaries and selectivity instead of falling back to default selectivity of 1.0 ## What changes are included in this PR? Extend internal_arithmetic types with date and duration types ## Are these changes tested? Yes adjusted tests ## Are there any user-facing changes? no
1 parent 4a5d130 commit 6cd7e83

File tree

5 files changed

+151
-19
lines changed

5 files changed

+151
-19
lines changed

datafusion/common/src/scalar/mod.rs

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2526,6 +2526,25 @@ impl ScalarValue {
25262526
(Self::Float64(Some(l)), Self::Float64(Some(r))) => {
25272527
Some((l - r).abs().round() as _)
25282528
}
2529+
(Self::Date32(Some(l)), Self::Date32(Some(r))) => Some(l.abs_diff(*r) as _),
2530+
(Self::Date64(Some(l)), Self::Date64(Some(r))) => Some(l.abs_diff(*r) as _),
2531+
// Timestamp values are stored as epoch ticks regardless of timezone
2532+
// annotation, so the distance is tz-independent (tz is display metadata).
2533+
(Self::TimestampSecond(Some(l), _), Self::TimestampSecond(Some(r), _)) => {
2534+
Some(l.abs_diff(*r) as _)
2535+
}
2536+
(
2537+
Self::TimestampMillisecond(Some(l), _),
2538+
Self::TimestampMillisecond(Some(r), _),
2539+
) => Some(l.abs_diff(*r) as _),
2540+
(
2541+
Self::TimestampMicrosecond(Some(l), _),
2542+
Self::TimestampMicrosecond(Some(r), _),
2543+
) => Some(l.abs_diff(*r) as _),
2544+
(
2545+
Self::TimestampNanosecond(Some(l), _),
2546+
Self::TimestampNanosecond(Some(r), _),
2547+
) => Some(l.abs_diff(*r) as _),
25292548
(
25302549
Self::Decimal128(Some(l), lprecision, lscale),
25312550
Self::Decimal128(Some(r), rprecision, rscale),
@@ -8766,6 +8785,42 @@ mod tests {
87668785
ScalarValue::Decimal256(Some(10.into()), 1, 0),
87678786
5,
87688787
),
8788+
// Temporal types
8789+
(
8790+
ScalarValue::Date32(Some(0)),
8791+
ScalarValue::Date32(Some(10)),
8792+
10,
8793+
),
8794+
(
8795+
ScalarValue::Date32(Some(10)),
8796+
ScalarValue::Date32(Some(0)),
8797+
10,
8798+
),
8799+
(
8800+
ScalarValue::Date64(Some(1000)),
8801+
ScalarValue::Date64(Some(5000)),
8802+
4000,
8803+
),
8804+
(
8805+
ScalarValue::TimestampSecond(Some(100), None),
8806+
ScalarValue::TimestampSecond(Some(200), None),
8807+
100,
8808+
),
8809+
(
8810+
ScalarValue::TimestampMillisecond(Some(1000), None),
8811+
ScalarValue::TimestampMillisecond(Some(5000), None),
8812+
4000,
8813+
),
8814+
(
8815+
ScalarValue::TimestampMicrosecond(Some(0), None),
8816+
ScalarValue::TimestampMicrosecond(Some(1_000_000), None),
8817+
1_000_000,
8818+
),
8819+
(
8820+
ScalarValue::TimestampNanosecond(Some(1_000_000_000), None),
8821+
ScalarValue::TimestampNanosecond(Some(2_000_000_000), None),
8822+
1_000_000_000,
8823+
),
87698824
];
87708825
for (lhs, rhs, expected) in cases.iter() {
87718826
let distance = lhs.distance(rhs).unwrap();
@@ -8828,8 +8883,6 @@ mod tests {
88288883
ScalarValue::Boolean(Some(true)),
88298884
ScalarValue::Boolean(Some(false)),
88308885
),
8831-
(ScalarValue::Date32(Some(0)), ScalarValue::Date32(Some(1))),
8832-
(ScalarValue::Date64(Some(0)), ScalarValue::Date64(Some(1))),
88338886
(
88348887
ScalarValue::Decimal128(Some(123), 5, 5),
88358888
ScalarValue::Decimal128(Some(120), 5, 3),

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -150,15 +150,16 @@ mod test {
150150
// - null_count = 0 (partition values from paths are never null)
151151
// - min/max are the merged partition values across files in the group
152152
// - byte_size = num_rows * 4 (Date32 is 4 bytes per row)
153-
// - distinct_count = Inexact(1) per partition file (single partition value per file),
154-
// preserved via max() when merging stats across partitions
153+
// - distinct_count = Inexact(max_date - min_date + 1), derived from the
154+
// date range via interval analysis for temporal types
155155
let date32_byte_size = num_rows * 4;
156+
let distinct_dates = (max_date - min_date + 1) as usize;
156157
column_stats.push(ColumnStatistics {
157158
null_count: Precision::Exact(0),
158159
max_value: Precision::Exact(ScalarValue::Date32(Some(max_date))),
159160
min_value: Precision::Exact(ScalarValue::Date32(Some(min_date))),
160161
sum_value: Precision::Absent,
161-
distinct_count: Precision::Inexact(1),
162+
distinct_count: Precision::Inexact(distinct_dates),
162163
byte_size: Precision::Exact(date32_byte_size),
163164
});
164165
}
@@ -579,7 +580,7 @@ mod test {
579580
max_value: Precision::Exact(ScalarValue::Date32(Some(20151))),
580581
min_value: Precision::Exact(ScalarValue::Date32(Some(20148))),
581582
sum_value: Precision::Absent,
582-
distinct_count: Precision::Inexact(1),
583+
distinct_count: Precision::Inexact(4),
583584
byte_size: Precision::Absent,
584585
},
585586
// column 2: right.id (Int32, file column from t2) - right partition 0: ids [3,4]
@@ -613,7 +614,7 @@ mod test {
613614
max_value: Precision::Exact(ScalarValue::Date32(Some(20151))),
614615
min_value: Precision::Exact(ScalarValue::Date32(Some(20148))),
615616
sum_value: Precision::Absent,
616-
distinct_count: Precision::Inexact(1),
617+
distinct_count: Precision::Inexact(4),
617618
byte_size: Precision::Absent,
618619
},
619620
// column 2: right.id (Int32, file column from t2) - right partition 1: ids [1,2]
@@ -1252,7 +1253,7 @@ mod test {
12521253
DATE_2025_03_01,
12531254
))),
12541255
sum_value: Precision::Absent,
1255-
distinct_count: Precision::Inexact(1),
1256+
distinct_count: Precision::Inexact(2),
12561257
byte_size: Precision::Exact(8),
12571258
},
12581259
ColumnStatistics::new_unknown(), // window column
@@ -1280,7 +1281,7 @@ mod test {
12801281
DATE_2025_03_03,
12811282
))),
12821283
sum_value: Precision::Absent,
1283-
distinct_count: Precision::Inexact(1),
1284+
distinct_count: Precision::Inexact(2),
12841285
byte_size: Precision::Exact(8),
12851286
},
12861287
ColumnStatistics::new_unknown(), // window column
@@ -1417,8 +1418,7 @@ mod test {
14171418
byte_size: Precision::Exact(16),
14181419
},
14191420
// Left date column: all partitions (2025-03-01..2025-03-04)
1420-
// NDV is Inexact(1) because each Hive partition has exactly 1 distinct date value,
1421-
// and merging takes max as a conservative lower bound
1421+
// NDV is Inexact(4) derived from the date range via interval analysis
14221422
ColumnStatistics {
14231423
null_count: Precision::Exact(0),
14241424
max_value: Precision::Exact(ScalarValue::Date32(Some(
@@ -1428,7 +1428,7 @@ mod test {
14281428
DATE_2025_03_01,
14291429
))),
14301430
sum_value: Precision::Absent,
1431-
distinct_count: Precision::Inexact(1),
1431+
distinct_count: Precision::Inexact(4),
14321432
byte_size: Precision::Exact(16),
14331433
},
14341434
// Right id column: partition 0 only (id 3..4)
@@ -1441,7 +1441,7 @@ mod test {
14411441
byte_size: Precision::Exact(8),
14421442
},
14431443
// Right date column: partition 0 only (2025-03-01..2025-03-02)
1444-
// NDV is Inexact(1) from the single Hive partition's date value
1444+
// NDV is Inexact(2) derived from the date range via interval analysis
14451445
ColumnStatistics {
14461446
null_count: Precision::Exact(0),
14471447
max_value: Precision::Exact(ScalarValue::Date32(Some(
@@ -1451,7 +1451,7 @@ mod test {
14511451
DATE_2025_03_01,
14521452
))),
14531453
sum_value: Precision::Absent,
1454-
distinct_count: Precision::Inexact(1),
1454+
distinct_count: Precision::Inexact(2),
14551455
byte_size: Precision::Exact(8),
14561456
},
14571457
],
@@ -1503,7 +1503,7 @@ mod test {
15031503
DATE_2025_03_01,
15041504
))),
15051505
sum_value: Precision::Absent,
1506-
distinct_count: Precision::Inexact(1),
1506+
distinct_count: Precision::Inexact(2),
15071507
byte_size: Precision::Exact(8),
15081508
},
15091509
// Right id column: partition 0 only (id 3..4)
@@ -1525,7 +1525,7 @@ mod test {
15251525
DATE_2025_03_01,
15261526
))),
15271527
sum_value: Precision::Absent,
1528-
distinct_count: Precision::Inexact(1),
1528+
distinct_count: Precision::Inexact(2),
15291529
byte_size: Precision::Exact(8),
15301530
},
15311531
],
@@ -1577,7 +1577,7 @@ mod test {
15771577
DATE_2025_03_01,
15781578
))),
15791579
sum_value: Precision::Absent,
1580-
distinct_count: Precision::Inexact(1),
1580+
distinct_count: Precision::Inexact(4),
15811581
byte_size: Precision::Exact(16),
15821582
},
15831583
// Right id column: all partitions (id 1..4)
@@ -1599,7 +1599,7 @@ mod test {
15991599
DATE_2025_03_01,
16001600
))),
16011601
sum_value: Precision::Absent,
1602-
distinct_count: Precision::Inexact(1),
1602+
distinct_count: Precision::Inexact(4),
16031603
byte_size: Precision::Exact(16),
16041604
},
16051605
],

datafusion/expr-common/src/interval_arithmetic.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,12 @@ impl Interval {
929929
/// when the calculated cardinality does not fit in an `u64`.
930930
pub fn cardinality(&self) -> Option<u64> {
931931
let data_type = self.data_type();
932-
if data_type.is_integer() {
932+
if data_type.is_integer()
933+
|| matches!(
934+
data_type,
935+
DataType::Date32 | DataType::Date64 | DataType::Timestamp(_, _)
936+
)
937+
{
933938
self.upper.distance(&self.lower).map(|diff| diff as u64)
934939
} else if data_type.is_floating() {
935940
// Negative numbers are sorted in the reverse order. To
@@ -3958,6 +3963,31 @@ mod tests {
39583963
)?;
39593964
assert_eq!(interval.cardinality().unwrap(), 2);
39603965

3966+
// Temporal types
3967+
let interval = Interval::try_new(
3968+
ScalarValue::Date32(Some(0)),
3969+
ScalarValue::Date32(Some(10)),
3970+
)?;
3971+
assert_eq!(interval.cardinality().unwrap(), 11);
3972+
3973+
let interval = Interval::try_new(
3974+
ScalarValue::Date64(Some(1000)),
3975+
ScalarValue::Date64(Some(5000)),
3976+
)?;
3977+
assert_eq!(interval.cardinality().unwrap(), 4001);
3978+
3979+
let interval = Interval::try_new(
3980+
ScalarValue::TimestampSecond(Some(100), None),
3981+
ScalarValue::TimestampSecond(Some(200), None),
3982+
)?;
3983+
assert_eq!(interval.cardinality().unwrap(), 101);
3984+
3985+
let interval = Interval::try_new(
3986+
ScalarValue::TimestampNanosecond(Some(1_000_000_000), None),
3987+
ScalarValue::TimestampNanosecond(Some(2_000_000_000), None),
3988+
)?;
3989+
assert_eq!(interval.cardinality().unwrap(), 1_000_000_001);
3990+
39613991
Ok(())
39623992
}
39633993

datafusion/physical-expr/src/intervals/utils.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ pub fn is_datatype_supported(data_type: &DataType) -> bool {
103103
| &DataType::UInt8
104104
| &DataType::Float64
105105
| &DataType::Float32
106+
| &DataType::Date32
107+
| &DataType::Date64
108+
| &DataType::Timestamp(_, _)
106109
)
107110
}
108111

datafusion/physical-plan/src/filter.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2892,6 +2892,52 @@ mod tests {
28922892
Ok(())
28932893
}
28942894

2895+
#[tokio::test]
2896+
async fn test_filter_statistics_equality_timestamp_ndv() -> Result<()> {
2897+
// ts: min=1_000_000_000, max=2_000_000_000, ndv=500
2898+
let schema = Schema::new(vec![Field::new(
2899+
"ts",
2900+
DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None),
2901+
false,
2902+
)]);
2903+
let input = Arc::new(StatisticsExec::new(
2904+
Statistics {
2905+
num_rows: Precision::Inexact(1000),
2906+
total_byte_size: Precision::Inexact(8000),
2907+
column_statistics: vec![ColumnStatistics {
2908+
min_value: Precision::Inexact(ScalarValue::TimestampNanosecond(
2909+
Some(1_000_000_000),
2910+
None,
2911+
)),
2912+
max_value: Precision::Inexact(ScalarValue::TimestampNanosecond(
2913+
Some(2_000_000_000),
2914+
None,
2915+
)),
2916+
distinct_count: Precision::Inexact(500),
2917+
..Default::default()
2918+
}],
2919+
},
2920+
schema.clone(),
2921+
));
2922+
2923+
let predicate = Arc::new(BinaryExpr::new(
2924+
Arc::new(Column::new("ts", 0)),
2925+
Operator::Eq,
2926+
Arc::new(Literal::new(ScalarValue::TimestampNanosecond(
2927+
Some(1_500_000_000),
2928+
None,
2929+
))),
2930+
));
2931+
let filter: Arc<dyn ExecutionPlan> =
2932+
Arc::new(FilterExec::try_new(predicate, input)?);
2933+
let statistics = filter.partition_statistics(None)?;
2934+
assert_eq!(
2935+
statistics.column_statistics[0].distinct_count,
2936+
Precision::Exact(1)
2937+
);
2938+
Ok(())
2939+
}
2940+
28952941
#[test]
28962942
fn test_collect_equality_columns() {
28972943
use std::collections::HashSet;

0 commit comments

Comments
 (0)