Skip to content
7 changes: 5 additions & 2 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,9 +929,12 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync + Any {
fn propagate_constraints(
&self,
_interval: &Interval,
_inputs: &[&Interval],
inputs: &[&Interval],
) -> Result<Option<Vec<Interval>>> {
Ok(Some(vec![]))
// Conservative default: return inputs unchanged (no narrowing).
// The returned vec must have the same length as `inputs` to satisfy
// the interval solver contract.
Ok(Some(inputs.iter().map(|i| (*i).clone()).collect()))
}

/// Calculates the [`SortProperties`] of this function based on its children's properties.
Expand Down
231 changes: 229 additions & 2 deletions datafusion/functions/src/math/ceil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,238 @@ impl ScalarUDFImpl for CeilFunc {
}

fn evaluate_bounds(&self, inputs: &[&Interval]) -> Result<Interval> {
let data_type = inputs[0].data_type();
Interval::make_unbounded(&data_type)
let [input] = inputs else {
return exec_err!(
"ceil expected 1 argument for bounds evaluation, got {}",
inputs.len()
);
};
let data_type = input.data_type();
match (ceil_scalar(input.lower()), ceil_scalar(input.upper())) {
(Some(lo), Some(hi)) => Interval::try_new(lo, hi)
.or_else(|_| Interval::make_unbounded(&data_type)),
Comment on lines +202 to +203
_ => Interval::make_unbounded(&data_type),
}
}

fn propagate_constraints(
&self,
interval: &Interval,
inputs: &[&Interval],
) -> Result<Option<Vec<Interval>>> {
let [input_interval] = inputs else {
return exec_err!(
"ceil expected 1 argument for constraint propagation, got {}",
inputs.len()
);
};
Comment thread
davidlghellin marked this conversation as resolved.
// ceil(x) ∈ [N, M] → x ∈ (ceil(N)−1, floor(M)]
// Normalize bounds to integers ceil can actually take before mapping back.
let lo = match interval.lower() {
ScalarValue::Float64(Some(n)) if n.is_finite() => {
Some(ScalarValue::Float64(Some(n.ceil() - 1.0)))
}
ScalarValue::Float32(Some(n)) if n.is_finite() => {
Some(ScalarValue::Float32(Some(n.ceil() - 1.0)))
}
_ => None,
};
let hi = match interval.upper() {
ScalarValue::Float64(Some(n)) if n.is_finite() => {
Some(ScalarValue::Float64(Some(n.floor())))
}
ScalarValue::Float32(Some(n)) if n.is_finite() => {
Some(ScalarValue::Float32(Some(n.floor())))
}
_ => None,
};
match (lo, hi) {
(Some(lo), Some(hi)) => {
let constraint = Interval::try_new(lo, hi)?;
Ok(input_interval.intersect(constraint)?.map(|r| vec![r]))
}
_ => Ok(Some(vec![(*input_interval).clone()])),
Comment on lines +239 to +244
}
}
Comment thread
davidlghellin marked this conversation as resolved.

fn documentation(&self) -> Option<&Documentation> {
self.doc()
}
}

fn ceil_scalar(v: &ScalarValue) -> Option<ScalarValue> {
match v {
ScalarValue::Float64(Some(f)) if f.is_finite() => {
Some(ScalarValue::Float64(Some(f.ceil())))
}
ScalarValue::Float32(Some(f)) if f.is_finite() => {
Some(ScalarValue::Float32(Some(f.ceil())))
}
_ => None,
}
}

#[cfg(test)]
mod tests {
use super::*;

fn ceil() -> CeilFunc {
CeilFunc::new()
}

fn f64_interval(lo: f64, hi: f64) -> Interval {
Interval::try_new(
ScalarValue::Float64(Some(lo)),
ScalarValue::Float64(Some(hi)),
)
.unwrap()
}

fn f32_interval(lo: f32, hi: f32) -> Interval {
Interval::try_new(
ScalarValue::Float32(Some(lo)),
ScalarValue::Float32(Some(hi)),
)
.unwrap()
}

fn unbounded_f64() -> Interval {
Interval::make_unbounded(&DataType::Float64).unwrap()
}

fn unbounded_f32() -> Interval {
Interval::make_unbounded(&DataType::Float32).unwrap()
}

// --- evaluate_bounds ---

#[test]
fn test_evaluate_bounds_basic() {
// ceil([1.2, 3.7]) = [2.0, 4.0]
let input = f64_interval(1.2, 3.7);
let result = ceil().evaluate_bounds(&[&input]).unwrap();
assert_eq!(result, f64_interval(2.0, 4.0));
}

#[test]
fn test_evaluate_bounds_already_integer() {
// ceil([2.0, 4.0]) = [2.0, 4.0]
let input = f64_interval(2.0, 4.0);
let result = ceil().evaluate_bounds(&[&input]).unwrap();
assert_eq!(result, f64_interval(2.0, 4.0));
}

#[test]
fn test_evaluate_bounds_f32() {
// ceil([1.1f32, 2.9f32]) = [2.0f32, 3.0f32]
let input = f32_interval(1.1, 2.9);
let result = ceil().evaluate_bounds(&[&input]).unwrap();
assert_eq!(result, f32_interval(2.0, 3.0));
}

#[test]
fn test_evaluate_bounds_unbounded_returns_unbounded() {
let input = unbounded_f64();
let result = ceil().evaluate_bounds(&[&input]).unwrap();
assert_eq!(result, unbounded_f64());
}

#[test]
fn test_evaluate_bounds_negative() {
// ceil([-3.7, -1.2]) = [-3.0, -1.0]
let input = f64_interval(-3.7, -1.2);
let result = ceil().evaluate_bounds(&[&input]).unwrap();
assert_eq!(result, f64_interval(-3.0, -1.0));
}

// --- propagate_constraints ---

#[test]
fn test_propagate_constraints_basic() {
// ceil(x) ∈ [13.0, 15.0] → x ∈ (12.0, 15.0]
let output = f64_interval(13.0, 15.0);
let input = unbounded_f64();
let result = ceil()
.propagate_constraints(&output, &[&input])
.unwrap()
.unwrap();
assert_eq!(result[0], f64_interval(12.0, 15.0));
}

#[test]
fn test_propagate_constraints_non_integer_bounds() {
// ceil(x) ∈ [12.3, 14.7] — non-integer bounds are normalized:
// lower: ceil(12.3)-1 = 13-1 = 12.0, upper: floor(14.7) = 14.0
// → x ∈ (12.0, 14.0]
let output = f64_interval(12.3, 14.7);
let input = unbounded_f64();
let result = ceil()
.propagate_constraints(&output, &[&input])
.unwrap()
.unwrap();
assert_eq!(result[0], f64_interval(12.0, 14.0));
}

#[test]
fn test_propagate_constraints_f32() {
// Same as basic but with Float32
let output = f32_interval(5.0, 8.0);
let input = unbounded_f32();
let result = ceil()
.propagate_constraints(&output, &[&input])
.unwrap()
.unwrap();
assert_eq!(result[0], f32_interval(4.0, 8.0));
}

#[test]
fn test_propagate_constraints_unbounded_output_no_change() {
// No output constraint → input unchanged
let output = unbounded_f64();
let input = f64_interval(1.0, 10.0);
let result = ceil()
.propagate_constraints(&output, &[&input])
.unwrap()
.unwrap();
assert_eq!(result[0], input);
}

#[test]
fn test_propagate_constraints_nan_output_no_change() {
// NaN bounds → conservative: input unchanged
let output = Interval::try_new(
ScalarValue::Float64(Some(f64::NAN)),
ScalarValue::Float64(Some(f64::NAN)),
)
.unwrap();
let input = f64_interval(0.0, 100.0);
let result = ceil()
.propagate_constraints(&output, &[&input])
.unwrap()
.unwrap();
assert_eq!(result[0], input);
}

#[test]
fn test_propagate_constraints_negative_range() {
// ceil(x) ∈ [-3.0, -1.0] → x ∈ (-4.0, -1.0]
let output = f64_interval(-3.0, -1.0);
let input = unbounded_f64();
let result = ceil()
.propagate_constraints(&output, &[&input])
.unwrap()
.unwrap();
assert_eq!(result[0], f64_interval(-4.0, -1.0));
}

#[test]
fn test_propagate_constraints_empty_intersection() {
// x ∈ [5.0, 7.0], constraint ceil(x) ∈ [20.0, 30.0]
// mapped input constraint: [19.0, 30.0] — no overlap with [5.0, 7.0]
// → intersect returns None → Ok(None) (branch pruned)
let output = f64_interval(20.0, 30.0);
let input = f64_interval(5.0, 7.0);
let result = ceil().propagate_constraints(&output, &[&input]).unwrap();
assert!(result.is_none());
}
}
Loading
Loading