From 3a24ce38f0099ee3c936d0378611d2e637edd1f8 Mon Sep 17 00:00:00 2001 From: Shiv Bhatia Date: Fri, 27 Feb 2026 12:17:57 +0000 Subject: [PATCH 1/5] Add floor --- datafusion/spark/src/function/math/floor.rs | 198 ++++++++++++++++++++ datafusion/spark/src/function/math/mod.rs | 8 + 2 files changed, 206 insertions(+) create mode 100644 datafusion/spark/src/function/math/floor.rs diff --git a/datafusion/spark/src/function/math/floor.rs b/datafusion/spark/src/function/math/floor.rs new file mode 100644 index 0000000000000..13b0e5bbfb236 --- /dev/null +++ b/datafusion/spark/src/function/math/floor.rs @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{AsArray, Decimal128Array}; +use arrow::compute::cast; +use arrow::datatypes::{DataType, Decimal128Type, Float32Type, Float64Type, Int64Type}; +use datafusion_common::utils::take_function_args; +use datafusion_common::{exec_err, Result}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; + +/// Spark-compatible `floor` expression +/// +/// +/// Differences with DataFusion floor: +/// - Spark's floor returns Int64 for float/integer types +/// - Spark's floor adjusts precision for Decimal128 types +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkFloor { + signature: Signature, +} + +impl Default for SparkFloor { + fn default() -> Self { + Self::new() + } +} + +impl SparkFloor { + pub fn new() -> Self { + Self { + signature: Signature::numeric(1, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for SparkFloor { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "floor" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + match &arg_types[0] { + DataType::Decimal128(p, s) if *s > 0 => { + let new_p = ((*p as i64) - (*s as i64) + 1).clamp(1, 38) as u8; + Ok(DataType::Decimal128(new_p, 0)) + } + DataType::Decimal128(p, s) => Ok(DataType::Decimal128(*p, *s)), + _ => Ok(DataType::Int64), + } + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let return_type = args.return_type().clone(); + spark_floor(&args.args, &return_type) + } +} + +fn spark_floor(args: &[ColumnarValue], return_type: &DataType) -> Result { + let input = match take_function_args("floor", args)? { + [ColumnarValue::Scalar(value)] => value.to_array()?, + [ColumnarValue::Array(arr)] => Arc::clone(arr), + }; + + let result = match input.data_type() { + DataType::Float32 => Arc::new( + input + .as_primitive::() + .unary::<_, Int64Type>(|x| x.floor() as i64), + ) as _, + DataType::Float64 => Arc::new( + input + .as_primitive::() + .unary::<_, Int64Type>(|x| x.floor() as i64), + ) as _, + dt if dt.is_integer() => cast(&input, &DataType::Int64)?, + DataType::Decimal128(_, s) if *s > 0 => { + let div = 10_i128.pow(*s as u32); + let result: Decimal128Array = + input.as_primitive::().unary(|x| { + let d = x / div; + let r = x % div; + if r < 0 { d - 1 } else { d } + }); + Arc::new(result.with_data_type(return_type.clone())) + } + DataType::Decimal128(_, _) => input, + other => return exec_err!("Unsupported data type {other:?} for function floor"), + }; + + Ok(ColumnarValue::Array(result)) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Decimal128Array, Float32Array, Float64Array, Int64Array}; + use datafusion_common::ScalarValue; + + #[test] + fn test_floor_float64() { + let input = Float64Array::from(vec![Some(1.9), Some(-1.1), Some(0.0), None]); + let args = vec![ColumnarValue::Array(Arc::new(input))]; + let result = spark_floor(&args, &DataType::Int64).unwrap(); + let result = match result { + ColumnarValue::Array(arr) => arr, + _ => panic!("Expected array"), + }; + let result = result.as_primitive::(); + assert_eq!( + result, + &Int64Array::from(vec![Some(1), Some(-2), Some(0), None]) + ); + } + + #[test] + fn test_floor_float32() { + let input = Float32Array::from(vec![Some(1.5f32), Some(-1.5f32)]); + let args = vec![ColumnarValue::Array(Arc::new(input))]; + let result = spark_floor(&args, &DataType::Int64).unwrap(); + let result = match result { + ColumnarValue::Array(arr) => arr, + _ => panic!("Expected array"), + }; + let result = result.as_primitive::(); + assert_eq!(result, &Int64Array::from(vec![Some(1), Some(-2)])); + } + + #[test] + fn test_floor_int64() { + let input = Int64Array::from(vec![Some(1), Some(-1), None]); + let args = vec![ColumnarValue::Array(Arc::new(input))]; + let result = spark_floor(&args, &DataType::Int64).unwrap(); + let result = match result { + ColumnarValue::Array(arr) => arr, + _ => panic!("Expected array"), + }; + let result = result.as_primitive::(); + assert_eq!(result, &Int64Array::from(vec![Some(1), Some(-1), None])); + } + + #[test] + fn test_floor_decimal128() { + // Decimal128(10, 2): 150 = 1.50, -150 = -1.50, 100 = 1.00 + let return_type = DataType::Decimal128(9, 0); + let input = Decimal128Array::from(vec![Some(150), Some(-150), Some(100), None]) + .with_data_type(DataType::Decimal128(10, 2)); + let args = vec![ColumnarValue::Array(Arc::new(input))]; + let result = spark_floor(&args, &return_type).unwrap(); + let result = match result { + ColumnarValue::Array(arr) => arr, + _ => panic!("Expected array"), + }; + let result = result.as_primitive::(); + let expected = Decimal128Array::from(vec![Some(1), Some(-2), Some(1), None]) + .with_data_type(return_type); + assert_eq!(result, &expected); + } + + #[test] + fn test_floor_scalar() { + let input = ScalarValue::Float64(Some(1.9)); + let args = vec![ColumnarValue::Scalar(input)]; + let result = spark_floor(&args, &DataType::Int64).unwrap(); + let result = match result { + ColumnarValue::Array(arr) => arr, + _ => panic!("Expected array"), + }; + let result = result.as_primitive::(); + assert_eq!(result, &Int64Array::from(vec![Some(1)])); + } +} diff --git a/datafusion/spark/src/function/math/mod.rs b/datafusion/spark/src/function/math/mod.rs index 7f7d04e06b0be..3231776f26bad 100644 --- a/datafusion/spark/src/function/math/mod.rs +++ b/datafusion/spark/src/function/math/mod.rs @@ -18,6 +18,7 @@ pub mod abs; pub mod bin; pub mod expm1; +pub mod floor; pub mod factorial; pub mod hex; pub mod modulus; @@ -34,6 +35,7 @@ use std::sync::Arc; make_udf_function!(abs::SparkAbs, abs); make_udf_function!(expm1::SparkExpm1, expm1); make_udf_function!(factorial::SparkFactorial, factorial); +make_udf_function!(floor::SparkFloor, floor); make_udf_function!(hex::SparkHex, hex); make_udf_function!(modulus::SparkMod, modulus); make_udf_function!(modulus::SparkPmod, pmod); @@ -55,6 +57,11 @@ pub mod expr_fn { "Returns the factorial of expr. expr is [0..20]. Otherwise, null.", arg1 )); + export_functions!(( + floor, + "Returns the largest integer not greater than expr.", + arg1 + )); export_functions!((hex, "Computes hex value of the given column.", arg1)); export_functions!((modulus, "Returns the remainder of division of the first argument by the second argument.", arg1 arg2)); export_functions!((pmod, "Returns the positive remainder of division of the first argument by the second argument.", arg1 arg2)); @@ -84,6 +91,7 @@ pub fn functions() -> Vec> { abs(), expm1(), factorial(), + floor(), hex(), modulus(), pmod(), From dd3356eb71546f0011c7c4ca2c9c998e63a702e1 Mon Sep 17 00:00:00 2001 From: Shiv Bhatia Date: Fri, 27 Feb 2026 13:38:26 +0000 Subject: [PATCH 2/5] add comet tests --- datafusion/spark/src/function/math/floor.rs | 77 ++++++++++++++++++--- 1 file changed, 69 insertions(+), 8 deletions(-) diff --git a/datafusion/spark/src/function/math/floor.rs b/datafusion/spark/src/function/math/floor.rs index 13b0e5bbfb236..d2546dc337686 100644 --- a/datafusion/spark/src/function/math/floor.rs +++ b/datafusion/spark/src/function/math/floor.rs @@ -22,7 +22,7 @@ use arrow::array::{AsArray, Decimal128Array}; use arrow::compute::cast; use arrow::datatypes::{DataType, Decimal128Type, Float32Type, Float64Type, Int64Type}; use datafusion_common::utils::take_function_args; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{Result, exec_err}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; @@ -125,7 +125,15 @@ mod tests { #[test] fn test_floor_float64() { - let input = Float64Array::from(vec![Some(1.9), Some(-1.1), Some(0.0), None]); + let input = Float64Array::from(vec![ + Some(125.9345), + Some(15.9999), + Some(0.9), + Some(-0.1), + Some(-1.999), + Some(123.0), + None, + ]); let args = vec![ColumnarValue::Array(Arc::new(input))]; let result = spark_floor(&args, &DataType::Int64).unwrap(); let result = match result { @@ -135,13 +143,29 @@ mod tests { let result = result.as_primitive::(); assert_eq!( result, - &Int64Array::from(vec![Some(1), Some(-2), Some(0), None]) + &Int64Array::from(vec![ + Some(125), + Some(15), + Some(0), + Some(-1), + Some(-2), + Some(123), + None, + ]) ); } #[test] fn test_floor_float32() { - let input = Float32Array::from(vec![Some(1.5f32), Some(-1.5f32)]); + let input = Float32Array::from(vec![ + Some(125.9345f32), + Some(15.9999f32), + Some(0.9f32), + Some(-0.1f32), + Some(-1.999f32), + Some(123.0f32), + None, + ]); let args = vec![ColumnarValue::Array(Arc::new(input))]; let result = spark_floor(&args, &DataType::Int64).unwrap(); let result = match result { @@ -149,7 +173,18 @@ mod tests { _ => panic!("Expected array"), }; let result = result.as_primitive::(); - assert_eq!(result, &Int64Array::from(vec![Some(1), Some(-2)])); + assert_eq!( + result, + &Int64Array::from(vec![ + Some(125), + Some(15), + Some(0), + Some(-1), + Some(-2), + Some(123), + None, + ]) + ); } #[test] @@ -184,8 +219,34 @@ mod tests { } #[test] - fn test_floor_scalar() { - let input = ScalarValue::Float64(Some(1.9)); + fn test_floor_float64_scalar() { + let input = ScalarValue::Float64(Some(-1.999)); + let args = vec![ColumnarValue::Scalar(input)]; + let result = spark_floor(&args, &DataType::Int64).unwrap(); + let result = match result { + ColumnarValue::Array(arr) => arr, + _ => panic!("Expected array"), + }; + let result = result.as_primitive::(); + assert_eq!(result, &Int64Array::from(vec![Some(-2)])); + } + + #[test] + fn test_floor_float32_scalar() { + let input = ScalarValue::Float32(Some(125.9345f32)); + let args = vec![ColumnarValue::Scalar(input)]; + let result = spark_floor(&args, &DataType::Int64).unwrap(); + let result = match result { + ColumnarValue::Array(arr) => arr, + _ => panic!("Expected array"), + }; + let result = result.as_primitive::(); + assert_eq!(result, &Int64Array::from(vec![Some(125)])); + } + + #[test] + fn test_floor_int64_scalar() { + let input = ScalarValue::Int64(Some(48)); let args = vec![ColumnarValue::Scalar(input)]; let result = spark_floor(&args, &DataType::Int64).unwrap(); let result = match result { @@ -193,6 +254,6 @@ mod tests { _ => panic!("Expected array"), }; let result = result.as_primitive::(); - assert_eq!(result, &Int64Array::from(vec![Some(1)])); + assert_eq!(result, &Int64Array::from(vec![Some(48)])); } } From b30a4a80dad6093830c0cef778feeff53e3b945d Mon Sep 17 00:00:00 2001 From: Shiv Bhatia Date: Fri, 27 Feb 2026 13:39:37 +0000 Subject: [PATCH 3/5] fmt --- datafusion/spark/src/function/math/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/math/mod.rs b/datafusion/spark/src/function/math/mod.rs index 3231776f26bad..0f4dbd63b1b67 100644 --- a/datafusion/spark/src/function/math/mod.rs +++ b/datafusion/spark/src/function/math/mod.rs @@ -18,8 +18,8 @@ pub mod abs; pub mod bin; pub mod expm1; -pub mod floor; pub mod factorial; +pub mod floor; pub mod hex; pub mod modulus; pub mod negative; From c244741633c8ab93cb3986d0b4b30bb79d890c10 Mon Sep 17 00:00:00 2001 From: Shiv Bhatia Date: Sat, 28 Feb 2026 09:42:15 +0000 Subject: [PATCH 4/5] Add .slt tests --- datafusion/spark/src/function/math/floor.rs | 8 +- .../test_files/spark/math/floor.slt | 113 +++++++++++++++++- 2 files changed, 115 insertions(+), 6 deletions(-) diff --git a/datafusion/spark/src/function/math/floor.rs b/datafusion/spark/src/function/math/floor.rs index d2546dc337686..b0de4b962ab8f 100644 --- a/datafusion/spark/src/function/math/floor.rs +++ b/datafusion/spark/src/function/math/floor.rs @@ -31,8 +31,12 @@ use datafusion_expr::{ /// /// /// Differences with DataFusion floor: -/// - Spark's floor returns Int64 for float/integer types -/// - Spark's floor adjusts precision for Decimal128 types +/// - Spark's floor returns Int64 for float and integer inputs; DataFusion preserves +/// the input type (Float32→Float32, Float64→Float64, integers coerced to Float64) +/// - Spark's floor on Decimal128(p, s) returns Decimal128(p−s+1, 0), reducing scale +/// to 0; DataFusion preserves the original precision and scale +/// - Spark only supports Decimal128; DataFusion also supports Decimal32/64/256 +/// - Spark does not check for decimal overflow; DataFusion errors on overflow #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkFloor { signature: Signature, diff --git a/datafusion/sqllogictest/test_files/spark/math/floor.slt b/datafusion/sqllogictest/test_files/spark/math/floor.slt index d39d47ab1fee8..38718cef4014f 100644 --- a/datafusion/sqllogictest/test_files/spark/math/floor.slt +++ b/datafusion/sqllogictest/test_files/spark/math/floor.slt @@ -23,20 +23,125 @@ ## Original Query: SELECT floor(-0.1); ## PySpark 3.5.5 Result: {'FLOOR(-0.1)': Decimal('-1'), 'typeof(FLOOR(-0.1))': 'decimal(1,0)', 'typeof(-0.1)': 'decimal(1,1)'} -#query -#SELECT floor(-0.1::decimal(1,1)); +query R +SELECT floor(-0.1::decimal(1,1)); +---- +-1 ## Original Query: SELECT floor(3.1411, -3); ## PySpark 3.5.5 Result: {'floor(3.1411, -3)': Decimal('0'), 'typeof(floor(3.1411, -3))': 'decimal(4,0)', 'typeof(3.1411)': 'decimal(5,4)', 'typeof(-3)': 'int'} +## TODO: 2-argument floor(value, scale) is not yet implemented #query #SELECT floor(3.1411::decimal(5,4), -3::int); ## Original Query: SELECT floor(3.1411, 3); ## PySpark 3.5.5 Result: {'floor(3.1411, 3)': Decimal('3.141'), 'typeof(floor(3.1411, 3))': 'decimal(5,3)', 'typeof(3.1411)': 'decimal(5,4)', 'typeof(3)': 'int'} +## TODO: 2-argument floor(value, scale) is not yet implemented #query #SELECT floor(3.1411::decimal(5,4), 3::int); ## Original Query: SELECT floor(5); ## PySpark 3.5.5 Result: {'FLOOR(5)': 5, 'typeof(FLOOR(5))': 'bigint', 'typeof(5)': 'int'} -#query -#SELECT floor(5::int); +query I +SELECT floor(5::int); +---- +5 + +# Scalar input: float64 returns bigint +query IIIIIII +SELECT floor(125.9345::DOUBLE), floor(15.9999::DOUBLE), floor(0.9::DOUBLE), floor(-0.1::DOUBLE), floor(-1.999::DOUBLE), floor(123.0::DOUBLE), floor(NULL::DOUBLE); +---- +125 15 0 -1 -2 123 NULL + +# Scalar input: float32 returns bigint +query IIIIIII +SELECT floor(125.9345::FLOAT), floor(15.9999::FLOAT), floor(0.9::FLOAT), floor(-0.1::FLOAT), floor(-1.999::FLOAT), floor(123.0::FLOAT), floor(NULL::FLOAT); +---- +125 15 0 -1 -2 123 NULL + +# Scalar input: integer types all return bigint +query III +SELECT floor(5::TINYINT), floor(-3::TINYINT), floor(NULL::TINYINT); +---- +5 -3 NULL + +query III +SELECT floor(5::SMALLINT), floor(-3::SMALLINT), floor(NULL::SMALLINT); +---- +5 -3 NULL + +query III +SELECT floor(5::INT), floor(-3::INT), floor(NULL::INT); +---- +5 -3 NULL + +query III +SELECT floor(5::BIGINT), floor(-3::BIGINT), floor(NULL::BIGINT); +---- +5 -3 NULL + +# Scalar input: decimal128 with scale > 0 returns decimal with scale 0 +# floor(1.50) = 1, floor(-1.50) = -2, floor(1.00) = 1 +query RRR +SELECT floor(1.50::DECIMAL(10, 2)), floor(-1.50::DECIMAL(10, 2)), floor(1.00::DECIMAL(10, 2)); +---- +1 -2 1 + +# floor(-0.1) = -1 (rounds down away from zero for negatives) +query RR +SELECT floor(-0.1::DECIMAL(3, 1)), floor(NULL::DECIMAL(10, 2)); +---- +-1 NULL + +# floor(3.1411) = 3 +query R +SELECT floor(3.1411::DECIMAL(5, 4)); +---- +3 + +# Scalar input: decimal128 with scale = 0 passes through unchanged +query RRR +SELECT floor(5::DECIMAL(10, 0)), floor(-3::DECIMAL(10, 0)), floor(NULL::DECIMAL(10, 0)); +---- +5 -3 NULL + +# Array input: float64 +query I +SELECT floor(a) FROM (VALUES (125.9345::DOUBLE), (15.9999::DOUBLE), (0.9::DOUBLE), (-0.1::DOUBLE), (-1.999::DOUBLE), (123.0::DOUBLE), (NULL::DOUBLE)) AS t(a); +---- +125 +15 +0 +-1 +-2 +123 +NULL + +# Array input: float32 +query I +SELECT floor(a) FROM (VALUES (125.9345::FLOAT), (15.9999::FLOAT), (0.9::FLOAT), (-0.1::FLOAT), (-1.999::FLOAT), (123.0::FLOAT), (NULL::FLOAT)) AS t(a); +---- +125 +15 +0 +-1 +-2 +123 +NULL + +# Array input: integers +query I +SELECT floor(a) FROM (VALUES (5::INT), (-3::INT), (NULL::INT)) AS t(a); +---- +5 +-3 +NULL + +# Array input: decimal128 with scale > 0 +query R +SELECT floor(a) FROM (VALUES (1.50::DECIMAL(10, 2)), (-1.50::DECIMAL(10, 2)), (1.00::DECIMAL(10, 2)), (NULL::DECIMAL(10, 2))) AS t(a); +---- +1 +-2 +1 +NULL From f1f16aeaaed509683a68275388039d20fbd31e00 Mon Sep 17 00:00:00 2001 From: Shiv Bhatia Date: Tue, 10 Mar 2026 14:10:01 +0000 Subject: [PATCH 5/5] Add comment explaining difference between spark and datafusion in slt test --- .../sqllogictest/test_files/spark/math/floor.slt | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/datafusion/sqllogictest/test_files/spark/math/floor.slt b/datafusion/sqllogictest/test_files/spark/math/floor.slt index 38718cef4014f..112ccc569ef26 100644 --- a/datafusion/sqllogictest/test_files/spark/math/floor.slt +++ b/datafusion/sqllogictest/test_files/spark/math/floor.slt @@ -21,6 +21,17 @@ # For more information, please see: # https://github.com/apache/datafusion/issues/15914 +# Tests for Spark-compatible floor function. +# Spark semantics differ from DataFusion's built-in floor in two ways: +# 1. Return type: Spark returns Int64 for float/integer inputs; +# DataFusion returns the same float type (e.g. floor(1.5::DOUBLE) -> DOUBLE in DF, BIGINT in Spark) +# 2. Decimal precision: Spark adjusts precision to (p - s + 1) for Decimal128(p, s) with scale > 0; +# DataFusion preserves the original precision and scale +# +# Example: SELECT floor(1.50::DECIMAL(10,2)) +# Spark: returns Decimal(9, 0) value 1 +# DataFusion: returns Decimal(10, 2) value 1.00 + ## Original Query: SELECT floor(-0.1); ## PySpark 3.5.5 Result: {'FLOOR(-0.1)': Decimal('-1'), 'typeof(FLOOR(-0.1))': 'decimal(1,0)', 'typeof(-0.1)': 'decimal(1,1)'} query R