Skip to content

Commit 361e165

Browse files
committed
ceil and test
1 parent d018d5f commit 361e165

2 files changed

Lines changed: 117 additions & 2 deletions

File tree

datafusion/functions/src/math/ceil.rs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,66 @@ impl ScalarUDFImpl for CeilFunc {
191191
}
192192

193193
fn evaluate_bounds(&self, inputs: &[&Interval]) -> Result<Interval> {
194-
let data_type = inputs[0].data_type();
195-
Interval::make_unbounded(&data_type)
194+
let [input] = inputs else {
195+
return Interval::make_unbounded(&DataType::Float64);
196+
};
197+
let data_type = input.data_type();
198+
match (ceil_scalar(input.lower()), ceil_scalar(input.upper())) {
199+
(Some(lo), Some(hi)) => Interval::try_new(lo, hi)
200+
.or_else(|_| Interval::make_unbounded(&data_type)),
201+
_ => Interval::make_unbounded(&data_type),
202+
}
203+
}
204+
205+
fn propagate_constraints(
206+
&self,
207+
interval: &Interval,
208+
inputs: &[&Interval],
209+
) -> Result<Option<Vec<Interval>>> {
210+
let [input_interval] = inputs else {
211+
return Ok(Some(vec![]));
212+
};
213+
// ceil(x) ∈ [N, M] → x ∈ (N−1, M] — conservative closed: [N−1, M]
214+
let lo = match interval.lower() {
215+
ScalarValue::Float64(Some(n)) if n.is_finite() => {
216+
Some(ScalarValue::Float64(Some(n - 1.0)))
217+
}
218+
ScalarValue::Float32(Some(n)) if n.is_finite() => {
219+
Some(ScalarValue::Float32(Some(n - 1.0)))
220+
}
221+
_ => None,
222+
};
223+
let hi = match interval.upper() {
224+
ScalarValue::Float64(Some(n)) if n.is_finite() => {
225+
Some(ScalarValue::Float64(Some(*n)))
226+
}
227+
ScalarValue::Float32(Some(n)) if n.is_finite() => {
228+
Some(ScalarValue::Float32(Some(*n)))
229+
}
230+
_ => None,
231+
};
232+
match (lo, hi) {
233+
(Some(lo), Some(hi)) => {
234+
let constraint = Interval::try_new(lo, hi)?;
235+
Ok(input_interval.intersect(constraint)?.map(|r| vec![r]))
236+
}
237+
_ => Ok(Some(vec![])),
238+
}
196239
}
197240

198241
fn documentation(&self) -> Option<&Documentation> {
199242
self.doc()
200243
}
201244
}
245+
246+
fn ceil_scalar(v: &ScalarValue) -> Option<ScalarValue> {
247+
match v {
248+
ScalarValue::Float64(Some(f)) if f.is_finite() => {
249+
Some(ScalarValue::Float64(Some(f.ceil())))
250+
}
251+
ScalarValue::Float32(Some(f)) if f.is_finite() => {
252+
Some(ScalarValue::Float32(Some(f.ceil())))
253+
}
254+
_ => None,
255+
}
256+
}

datafusion/physical-plan/src/filter.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,6 +1209,66 @@ mod tests {
12091209
Ok(())
12101210
}
12111211

1212+
#[tokio::test]
1213+
async fn test_filter_statistics_ceil_scalar_fn() -> Result<()> {
1214+
// Table: x Float64, min=8.0, max=16.0, 100 rows.
1215+
// Filter: ceil(x) > 12.0
1216+
//
1217+
// The range [8.0, 16.0] lies within a single IEEE-754 binade so
1218+
// Float64 cardinality is proportional to the value range, making
1219+
// the selectivity estimate predictable.
1220+
//
1221+
// With check_support recognising ScalarFunctionExpr and CeilFunc
1222+
// implementing evaluate_bounds/propagate_constraints the solver
1223+
// narrows x to roughly [11.0, 16.0]:
1224+
// ceil(x) > 12 → x ∈ (11, 16] → conservative [11, 16]
1225+
// selectivity ≈ (16−11)/(16−8) = 5/8 = 0.625 → ~62 rows
1226+
//
1227+
// Without the fix the estimate stays at 100 (no interval analysis).
1228+
let schema = Schema::new(vec![Field::new("x", DataType::Float64, false)]);
1229+
let input = Arc::new(StatisticsExec::new(
1230+
Statistics {
1231+
num_rows: Precision::Inexact(100),
1232+
total_byte_size: Precision::Absent,
1233+
column_statistics: vec![ColumnStatistics {
1234+
min_value: Precision::Inexact(ScalarValue::Float64(Some(8.0))),
1235+
max_value: Precision::Inexact(ScalarValue::Float64(Some(16.0))),
1236+
..Default::default()
1237+
}],
1238+
},
1239+
schema.clone(),
1240+
));
1241+
1242+
let x = col("x", &schema)?;
1243+
let ceil_udf = datafusion_functions::math::ceil();
1244+
let config = Arc::new(ConfigOptions::new());
1245+
let ceil_x: Arc<dyn PhysicalExpr> =
1246+
Arc::new(datafusion_physical_expr::ScalarFunctionExpr::try_new(
1247+
Arc::clone(&ceil_udf),
1248+
vec![x],
1249+
&schema,
1250+
config,
1251+
)?);
1252+
let predicate = binary(ceil_x, Operator::Gt, lit(12.0f64), &schema)?;
1253+
1254+
let filter = Arc::new(FilterExec::try_new(predicate, input)?);
1255+
let statistics = filter.partition_statistics(None)?;
1256+
1257+
let num_rows = statistics.num_rows.get_value().copied().unwrap_or(100);
1258+
// Interval analysis must narrow the estimate below the full 100-row input.
1259+
assert!(
1260+
num_rows < 100,
1261+
"expected interval analysis to narrow row estimate, got {num_rows}"
1262+
);
1263+
// The conservative bound is x ∈ [11, 16] out of [8, 16] → ~62 rows.
1264+
// Allow a generous range to be robust to float-cardinality rounding.
1265+
assert!(
1266+
num_rows >= 50,
1267+
"expected at least 50 rows after ceil(x) > 12.0 on [8,16], got {num_rows}"
1268+
);
1269+
Ok(())
1270+
}
1271+
12121272
#[tokio::test]
12131273
async fn test_filter_statistics_basic_expr() -> Result<()> {
12141274
// Table:

0 commit comments

Comments
 (0)