Skip to content

Commit 7f26ea9

Browse files
author
Kazantsev Maksim
committed
Add unit and benchmark tests for to_json
1 parent 9500bbb commit 7f26ea9

4 files changed

Lines changed: 124 additions & 32 deletions

File tree

spark/src/main/scala/org/apache/comet/serde/structs.scala

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -111,26 +111,6 @@ object CometStructsToJson extends CometExpressionSerde[StructsToJson] {
111111
withInfo(expr, "StructsToJson with options is not supported")
112112
None
113113
} else {
114-
115-
def isSupportedType(dt: DataType): Boolean = {
116-
dt match {
117-
case StructType(fields) =>
118-
fields.forall(f => isSupportedType(f.dataType))
119-
case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType |
120-
DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
121-
DataTypes.DoubleType | DataTypes.StringType =>
122-
true
123-
case DataTypes.DateType | DataTypes.TimestampType =>
124-
// TODO implement these types with tests for formatting options and timezone
125-
false
126-
case _: MapType | _: ArrayType =>
127-
// Spark supports map and array in StructsToJson but this is not yet
128-
// implemented in Comet
129-
false
130-
case _ => false
131-
}
132-
}
133-
134114
val isSupported = expr.child.dataType match {
135115
case s: StructType =>
136116
s.fields.forall(f => isSupportedType(f.dataType))
@@ -166,6 +146,25 @@ object CometStructsToJson extends CometExpressionSerde[StructsToJson] {
166146
}
167147
}
168148
}
149+
150+
def isSupportedType(dt: DataType): Boolean = {
151+
dt match {
152+
case StructType(fields) =>
153+
fields.forall(f => isSupportedType(f.dataType))
154+
case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType |
155+
DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
156+
DataTypes.DoubleType | DataTypes.StringType =>
157+
true
158+
case DataTypes.DateType | DataTypes.TimestampType =>
159+
// TODO implement these types with tests for formatting options and timezone
160+
false
161+
case _: MapType | _: ArrayType =>
162+
// Spark supports map and array in StructsToJson but this is not yet
163+
// implemented in Comet
164+
false
165+
case _ => false
166+
}
167+
}
169168
}
170169

171170
object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] {

spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,8 @@ object FuzzDataGenerator {
229229
Range(0, numRows).map(_ => {
230230
r.nextInt(20) match {
231231
case 0 if options.allowNull => null
232-
case 1 => Float.NegativeInfinity
233-
case 2 => Float.PositiveInfinity
232+
case 1 if options.generateInfinity => Float.NegativeInfinity
233+
case 2 if options.generateInfinity => Float.PositiveInfinity
234234
case 3 => Float.MinValue
235235
case 4 => Float.MaxValue
236236
case 5 => 0.0f
@@ -243,8 +243,8 @@ object FuzzDataGenerator {
243243
Range(0, numRows).map(_ => {
244244
r.nextInt(20) match {
245245
case 0 if options.allowNull => null
246-
case 1 => Double.NegativeInfinity
247-
case 2 => Double.PositiveInfinity
246+
case 1 if options.generateInfinity => Double.NegativeInfinity
247+
case 2 if options.generateInfinity => Double.PositiveInfinity
248248
case 3 => Double.MinValue
249249
case 4 => Double.MaxValue
250250
case 5 => 0.0
@@ -329,4 +329,5 @@ case class DataGenOptions(
329329
generateNaN: Boolean = true,
330330
baseDate: Long = FuzzDataGenerator.defaultBaseDate,
331331
customStrings: Seq[String] = Seq.empty,
332-
maxStringLength: Int = 8)
332+
maxStringLength: Int = 8,
333+
generateInfinity: Boolean = true)

spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,57 @@
1919

2020
package org.apache.comet
2121

22+
import scala.util.Random
23+
2224
import org.scalactic.source.Position
2325
import org.scalatest.Tag
2426

27+
import org.apache.hadoop.fs.Path
2528
import org.apache.spark.sql.CometTestBase
26-
import org.apache.spark.sql.catalyst.expressions.JsonToStructs
29+
import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, StructsToJson}
2730
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
31+
import org.apache.spark.sql.functions._
32+
33+
import org.apache.comet.serde.CometStructsToJson
34+
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions}
2835

2936
class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
3037

3138
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
3239
pos: Position): Unit = {
3340
super.test(testName, testTags: _*) {
34-
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true") {
41+
withSQLConf(
42+
CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true",
43+
CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true") {
3544
testFun
3645
}
3746
}
3847
}
3948

49+
test("to_json - all supported types") {
50+
withTempDir { dir =>
51+
val path = new Path(dir.toURI.toString, "test.parquet")
52+
val filename = path.toString
53+
val random = new Random(42)
54+
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
55+
ParquetGenerator.makeParquetFile(
56+
random,
57+
spark,
58+
filename,
59+
100,
60+
SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false),
61+
DataGenOptions(generateNaN = false, generateInfinity = false))
62+
}
63+
val table = spark.read.parquet(filename)
64+
val fieldsNames = table.schema.fields
65+
.filter(sf => CometStructsToJson.isSupportedType(sf.dataType))
66+
.map(sf => col(sf.name))
67+
.toSeq
68+
val df = table.select(to_json(struct(fieldsNames: _*)))
69+
checkSparkAnswerAndOperator(df)
70+
}
71+
}
72+
4073
test("from_json - basic primitives") {
4174
Seq(true, false).foreach { dictionaryEnabled =>
4275
withParquetTable(

spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package org.apache.spark.sql.benchmark
2121

22-
import org.apache.spark.sql.catalyst.expressions.JsonToStructs
22+
import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, StructsToJson}
2323

2424
import org.apache.comet.CometConf
2525

@@ -43,8 +43,9 @@ case class JsonExprConfig(
4343
// spotless:off
4444
/**
4545
* Benchmark to measure performance of Comet JSON expressions. To run this benchmark:
46-
* `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometJsonExpressionBenchmark`
47-
* Results will be written to "spark/benchmarks/CometJsonExpressionBenchmark-**results.txt".
46+
* `SPARK_GENERATE_BENCHMARK_FILES=1 make
47+
* benchmark-org.apache.spark.sql.benchmark.CometJsonExpressionBenchmark` Results will be written
48+
* to "spark/benchmarks/CometJsonExpressionBenchmark-**results.txt".
4849
*/
4950
// spotless:on
5051
object CometJsonExpressionBenchmark extends CometBenchmarkBase {
@@ -106,6 +107,44 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase {
106107
FROM $tbl
107108
""")
108109

110+
case "to_json - simple primitives" =>
111+
spark.sql(
112+
s"""SELECT named_struct("a", CAST(value AS INT), "b", concat("str_", CAST(value AS STRING))) AS json_struct FROM $tbl""")
113+
114+
case "to_json - all primitive types" =>
115+
spark.sql(s"""
116+
SELECT named_struct(
117+
"i32", CAST(value % 1000 AS INT),
118+
"i64", CAST(value * 1000000000L AS LONG),
119+
"f32", CAST(value * 1.5 AS FLOAT),
120+
"f64", CAST(value * 2.5 AS DOUBLE),
121+
"bool", CASE WHEN value % 2 = 0 THEN true ELSE false END,
122+
"str", concat("value_", CAST(value AS STRING))
123+
) AS json_struct FROM $tbl
124+
""")
125+
126+
case "to_json - with nulls" =>
127+
spark.sql(s"""
128+
SELECT
129+
CASE
130+
WHEN value % 10 = 0 THEN CAST(NULL AS STRUCT<a: INT, b: STRING>)
131+
WHEN value % 5 = 0 THEN named_struct("a", CAST(NULL AS INT), "b", "test")
132+
WHEN value % 3 = 0 THEN named_struct("a", CAST(123 AS INT), "b", CAST(NULL AS STRING))
133+
ELSE named_struct("a", CAST(value AS INT), "b", concat("str_", CAST(value AS STRING)))
134+
END AS json_struct
135+
FROM $tbl
136+
""")
137+
138+
case "to_json - nested struct" =>
139+
spark.sql(s"""
140+
SELECT named_struct(
141+
"outer", named_struct(
142+
"inner_a", CAST(value AS INT),
143+
"inner_b", concat("nested_", CAST(value AS STRING))
144+
)
145+
) AS json_struct FROM $tbl
146+
""")
147+
109148
case _ =>
110149
spark.sql(s"""
111150
SELECT
@@ -117,8 +156,9 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase {
117156
prepareTable(dir, jsonData)
118157

119158
val extraConfigs = Map(
159+
CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true",
120160
CometConf.getExprAllowIncompatConfigKey(
121-
classOf[JsonToStructs]) -> "true") ++ config.extraCometConfigs
161+
classOf[StructsToJson]) -> "true") ++ config.extraCometConfigs
122162

123163
runExpressionBenchmark(config.name, values, config.query, extraConfigs)
124164
}
@@ -127,6 +167,7 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase {
127167

128168
// Configuration for all JSON expression benchmarks
129169
private val jsonExpressions = List(
170+
// from_json tests
130171
JsonExprConfig(
131172
"from_json - simple primitives",
132173
"a INT, b STRING",
@@ -146,7 +187,25 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase {
146187
JsonExprConfig(
147188
"from_json - field access",
148189
"a INT, b STRING",
149-
"SELECT from_json(json_str, 'a INT, b STRING').a FROM parquetV1Table"))
190+
"SELECT from_json(json_str, 'a INT, b STRING').a FROM parquetV1Table"),
191+
192+
// to_json tests
193+
JsonExprConfig(
194+
"to_json - simple primitives",
195+
"a INT, b STRING",
196+
"SELECT to_json(json_struct) FROM parquetV1Table"),
197+
JsonExprConfig(
198+
"to_json - all primitive types",
199+
"i32 INT, i64 BIGINT, f32 FLOAT, f64 DOUBLE, bool BOOLEAN, str STRING",
200+
"SELECT to_json(json_struct) FROM parquetV1Table"),
201+
JsonExprConfig(
202+
"to_json - with nulls",
203+
"a INT, b STRING",
204+
"SELECT to_json(json_struct) FROM parquetV1Table"),
205+
JsonExprConfig(
206+
"to_json - nested struct",
207+
"outer STRUCT<inner_a: INT, inner_b: STRING>",
208+
"SELECT to_json(json_struct) FROM parquetV1Table"))
150209

151210
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
152211
val values = 1024 * 1024

0 commit comments

Comments
 (0)