@@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2626import org .apache .spark .sql .internal .SQLConf
2727import org .apache .spark .sql .types .{DataTypes , StructField , StructType }
2828
29+ import org .apache .comet .CometSparkSessionExtensions .isSpark35Plus
2930import org .apache .comet .testing .{DataGenOptions , FuzzDataGenerator }
3031
3132class CometMathExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
@@ -90,4 +91,58 @@ class CometMathExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe
9091 1000 ,
9192 DataGenOptions (generateNegativeZero = generateNegativeZero))
9293 }
94+
95+ test(" width_bucket" ) {
96+ assume(isSpark35Plus, " width_bucket was added in Spark 3.5" )
97+ withSQLConf(" spark.comet.exec.localTableScan.enabled" -> " true" ) {
98+ spark
99+ .createDataFrame(
100+ Seq ((5.3 , 0.2 , 10.6 , 5 ), (8.1 , 0.0 , 5.7 , 4 ), (- 0.9 , 5.2 , 0.5 , 2 ), (- 2.1 , 1.3 , 3.4 , 3 )))
101+ .toDF(" c1" , " c2" , " c3" , " c4" )
102+ .createOrReplaceTempView(" width_bucket_test" )
103+ checkSparkAnswerAndOperator(
104+ " SELECT c1, width_bucket(c1, c2, c3, c4) FROM width_bucket_test" )
105+ }
106+ }
107+
108+ test(" width_bucket - edge cases" ) {
109+ assume(isSpark35Plus, " width_bucket was added in Spark 3.5" )
110+ withSQLConf(" spark.comet.exec.localTableScan.enabled" -> " true" ) {
111+ spark
112+ .createDataFrame(Seq (
113+ (0.0 , 10.0 , 0.0 , 5 ), // Value equals max (reversed bounds)
114+ (10.0 , 0.0 , 10.0 , 5 ), // Value equals max (normal bounds)
115+ (10.0 , 0.0 , 0.0 , 5 ), // Min equals max - returns NULL
116+ (5.0 , 0.0 , 10.0 , 0 ) // Zero buckets - returns NULL
117+ ))
118+ .toDF(" c1" , " c2" , " c3" , " c4" )
119+ .createOrReplaceTempView(" width_bucket_edge" )
120+ checkSparkAnswerAndOperator(
121+ " SELECT c1, width_bucket(c1, c2, c3, c4) FROM width_bucket_edge" )
122+ }
123+ }
124+
125+ test(" width_bucket - NaN values" ) {
126+ assume(isSpark35Plus, " width_bucket was added in Spark 3.5" )
127+ withSQLConf(" spark.comet.exec.localTableScan.enabled" -> " true" ) {
128+ spark
129+ .createDataFrame(
130+ Seq ((Double .NaN , 5.0 , 0.0 ), (5.0 , Double .NaN , 0.0 ), (5.0 , 0.0 , Double .NaN )))
131+ .toDF(" c1" , " c2" , " c3" )
132+ .createOrReplaceTempView(" width_bucket_nan" )
133+ checkSparkAnswerAndOperator(" SELECT c1, width_bucket(c1, c2, c3, 5) FROM width_bucket_nan" )
134+ }
135+ }
136+
137+ test(" width_bucket - with range data" ) {
138+ assume(isSpark35Plus, " width_bucket was added in Spark 3.5" )
139+ withSQLConf(" spark.comet.exec.localTableScan.enabled" -> " true" ) {
140+ spark
141+ .range(10 )
142+ .selectExpr(" id" , " CAST(id AS DOUBLE) as value" )
143+ .createOrReplaceTempView(" width_bucket_range" )
144+ checkSparkAnswerAndOperator(
145+ " SELECT id, width_bucket(value, 0.0, 10.0, 5) FROM width_bucket_range ORDER BY id" )
146+ }
147+ }
93148}
0 commit comments