From 768b3e90f261c7aea58bdb98dc698b90deeeae34 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 14 Dec 2025 16:24:01 +0400 Subject: [PATCH 1/5] impl map_from_entries --- native/core/src/execution/jni_api.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +++++++++++- .../comet/CometMapExpressionSuite.scala | 45 +++++++++++++++++++ 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a24d993059..4f53cea3e6 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,6 +46,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; +use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -337,6 +338,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 54df2f1688..a99cf3824b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,7 +125,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays) + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[MapFromEntries] -> CometMapFromEntries) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 2e217f6af0..498aa3594c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,9 +19,12 @@ package org.apache.comet.serde +import scala.annotation.tailrec + import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, MapType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -89,3 +92,27 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } + +object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { + val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" + val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" + + private def containsBinary(dataType: DataType): Boolean = { + dataType match { + case BinaryType => true + case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) + case ArrayType(elementType, _) => containsBinary(elementType) + case _ => false + } + } + + override def getSupportLevel(expr: MapFromEntries): SupportLevel = { + if (containsBinary(expr.dataType.keyType)) { + return Incompatible(Some(keyUnsupportedReason)) + } + if (containsBinary(expr.dataType.valueType)) { + return Incompatible(Some(valueUnsupportedReason)) + } + Compatible(None) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 88c13391a6..01b9744ed6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,7 +25,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.BinaryType +import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -125,4 +127,47 @@ class CometMapExpressionSuite extends CometTestBase { } } + test("map_from_entries") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val schemaGenOptions = + SchemaGenOptions( + generateArray = true, + generateStruct = true, + primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) + val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + schemaGenOptions, + dataGenOptions) + } + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + for (field <- df.schema.fieldNames) { + checkSparkAnswerAndOperator( + spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) + } + } + } + + test("map_from_entries - fallback for binary type") { + val table = "t2" + withTable(table) { + sql( + s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), + CometMapFromEntries.keyUnsupportedReason) + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(0, c1))) from $table"), + CometMapFromEntries.valueUnsupportedReason) + } + } + } From c68c3428676b5d991e7ba9e13464bf2ce1ec84e8 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 16 Dec 2025 16:10:43 +0400 Subject: [PATCH 2/5] Revert "impl map_from_entries" This reverts commit 768b3e90f261c7aea58bdb98dc698b90deeeae34. --- native/core/src/execution/jni_api.rs | 2 - .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +----------- .../comet/CometMapExpressionSuite.scala | 45 ------------------- 4 files changed, 2 insertions(+), 77 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 4f53cea3e6..a24d993059 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,7 +46,6 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; -use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -338,7 +337,6 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); - session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a99cf3824b..54df2f1688 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,8 +125,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays, - classOf[MapFromEntries] -> CometMapFromEntries) + classOf[MapFromArrays] -> CometMapFromArrays) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 498aa3594c..2e217f6af0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,12 +19,9 @@ package org.apache.comet.serde -import scala.annotation.tailrec - import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, MapType} -import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -92,27 +89,3 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } - -object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { - val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" - val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" - - private def containsBinary(dataType: DataType): Boolean = { - dataType match { - case BinaryType => true - case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) - case ArrayType(elementType, _) => containsBinary(elementType) - case _ => false - } - } - - override def getSupportLevel(expr: MapFromEntries): SupportLevel = { - if (containsBinary(expr.dataType.keyType)) { - return Incompatible(Some(keyUnsupportedReason)) - } - if (containsBinary(expr.dataType.valueType)) { - return Incompatible(Some(valueUnsupportedReason)) - } - Compatible(None) - } -} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 01b9744ed6..88c13391a6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,9 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.BinaryType -import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -127,47 +125,4 @@ class CometMapExpressionSuite extends CometTestBase { } } - test("map_from_entries") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val schemaGenOptions = - SchemaGenOptions( - generateArray = true, - generateStruct = true, - primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) - val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - schemaGenOptions, - dataGenOptions) - } - val df = spark.read.parquet(filename) - df.createOrReplaceTempView("t1") - for (field <- df.schema.fieldNames) { - checkSparkAnswerAndOperator( - spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) - } - } - } - - test("map_from_entries - fallback for binary type") { - val table = "t2" - withTable(table) { - sql( - s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), - CometMapFromEntries.keyUnsupportedReason) - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(0, c1))) from $table"), - CometMapFromEntries.valueUnsupportedReason) - } - } - } From 7f26ea9e7964e7857e47d44cd75b025872d0f3b0 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Mon, 29 Dec 2025 13:31:11 +0400 Subject: [PATCH 3/5] Add unit and benchmark tests for to_json --- .../org/apache/comet/serde/structs.scala | 39 +++++------ .../comet/testing/FuzzDataGenerator.scala | 11 +-- .../comet/CometJsonExpressionSuite.scala | 37 +++++++++- .../CometJsonExpressionBenchmark.scala | 69 +++++++++++++++++-- 4 files changed, 124 insertions(+), 32 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala b/spark/src/main/scala/org/apache/comet/serde/structs.scala index 55e031d346..b76c64bac9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/structs.scala +++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala @@ -111,26 +111,6 @@ object CometStructsToJson extends CometExpressionSerde[StructsToJson] { withInfo(expr, "StructsToJson with options is not supported") None } else { - - def isSupportedType(dt: DataType): Boolean = { - dt match { - case StructType(fields) => - fields.forall(f => isSupportedType(f.dataType)) - case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType | - DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | - DataTypes.DoubleType | DataTypes.StringType => - true - case DataTypes.DateType | DataTypes.TimestampType => - // TODO implement these types with tests for formatting options and timezone - false - case _: MapType | _: ArrayType => - // Spark supports map and array in StructsToJson but this is not yet - // implemented in Comet - false - case _ => false - } - } - val isSupported = expr.child.dataType match { case s: StructType => s.fields.forall(f => isSupportedType(f.dataType)) @@ -166,6 +146,25 @@ object CometStructsToJson extends CometExpressionSerde[StructsToJson] { } } } + + def isSupportedType(dt: DataType): Boolean = { + dt match { + case StructType(fields) => + fields.forall(f => isSupportedType(f.dataType)) + case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType | + DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | + DataTypes.DoubleType | DataTypes.StringType => + true + case DataTypes.DateType | DataTypes.TimestampType => + // TODO implement these types with tests for formatting options and timezone + false + case _: MapType | _: ArrayType => + // Spark supports map and array in StructsToJson but this is not yet + // implemented in Comet + false + case _ => false + } + } } object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { diff --git a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala index 00a85930ba..24daebe132 100644 --- a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala @@ -229,8 +229,8 @@ object FuzzDataGenerator { Range(0, numRows).map(_ => { r.nextInt(20) match { case 0 if options.allowNull => null - case 1 => Float.NegativeInfinity - case 2 => Float.PositiveInfinity + case 1 if options.generateInfinity => Float.NegativeInfinity + case 2 if options.generateInfinity => Float.PositiveInfinity case 3 => Float.MinValue case 4 => Float.MaxValue case 5 => 0.0f @@ -243,8 +243,8 @@ object FuzzDataGenerator { Range(0, numRows).map(_ => { r.nextInt(20) match { case 0 if options.allowNull => null - case 1 => Double.NegativeInfinity - case 2 => Double.PositiveInfinity + case 1 if options.generateInfinity => Double.NegativeInfinity + case 2 if options.generateInfinity => Double.PositiveInfinity case 3 => Double.MinValue case 4 => Double.MaxValue case 5 => 0.0 @@ -329,4 +329,5 @@ case class DataGenOptions( generateNaN: Boolean = true, baseDate: Long = FuzzDataGenerator.defaultBaseDate, customStrings: Seq[String] = Seq.empty, - maxStringLength: Int = 8) + maxStringLength: Int = 8, + generateInfinity: Boolean = true) diff --git a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala index 38f5765268..c2c1ce3663 100644 --- a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala @@ -19,24 +19,57 @@ package org.apache.comet +import scala.util.Random + import org.scalactic.source.Position import org.scalatest.Tag +import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase -import org.apache.spark.sql.catalyst.expressions.JsonToStructs +import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, StructsToJson} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions._ + +import org.apache.comet.serde.CometStructsToJson +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = { super.test(testName, testTags: _*) { - withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true") { + withSQLConf( + CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true", + CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true") { testFun } } } + test("to_json - all supported types") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false), + DataGenOptions(generateNaN = false, generateInfinity = false)) + } + val table = spark.read.parquet(filename) + val fieldsNames = table.schema.fields + .filter(sf => CometStructsToJson.isSupportedType(sf.dataType)) + .map(sf => col(sf.name)) + .toSeq + val df = table.select(to_json(struct(fieldsNames: _*))) + checkSparkAnswerAndOperator(df) + } + } + test("from_json - basic primitives") { Seq(true, false).foreach { dictionaryEnabled => withParquetTable( diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala index 5b4741ba68..c877dbf150 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.benchmark -import org.apache.spark.sql.catalyst.expressions.JsonToStructs +import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, StructsToJson} import org.apache.comet.CometConf @@ -43,8 +43,9 @@ case class JsonExprConfig( // spotless:off /** * Benchmark to measure performance of Comet JSON expressions. To run this benchmark: - * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometJsonExpressionBenchmark` - * Results will be written to "spark/benchmarks/CometJsonExpressionBenchmark-**results.txt". + * `SPARK_GENERATE_BENCHMARK_FILES=1 make + * benchmark-org.apache.spark.sql.benchmark.CometJsonExpressionBenchmark` Results will be written + * to "spark/benchmarks/CometJsonExpressionBenchmark-**results.txt". */ // spotless:on object CometJsonExpressionBenchmark extends CometBenchmarkBase { @@ -106,6 +107,44 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase { FROM $tbl """) + case "to_json - simple primitives" => + spark.sql( + s"""SELECT named_struct("a", CAST(value AS INT), "b", concat("str_", CAST(value AS STRING))) AS json_struct FROM $tbl""") + + case "to_json - all primitive types" => + spark.sql(s""" + SELECT named_struct( + "i32", CAST(value % 1000 AS INT), + "i64", CAST(value * 1000000000L AS LONG), + "f32", CAST(value * 1.5 AS FLOAT), + "f64", CAST(value * 2.5 AS DOUBLE), + "bool", CASE WHEN value % 2 = 0 THEN true ELSE false END, + "str", concat("value_", CAST(value AS STRING)) + ) AS json_struct FROM $tbl + """) + + case "to_json - with nulls" => + spark.sql(s""" + SELECT + CASE + WHEN value % 10 = 0 THEN CAST(NULL AS STRUCT) + WHEN value % 5 = 0 THEN named_struct("a", CAST(NULL AS INT), "b", "test") + WHEN value % 3 = 0 THEN named_struct("a", CAST(123 AS INT), "b", CAST(NULL AS STRING)) + ELSE named_struct("a", CAST(value AS INT), "b", concat("str_", CAST(value AS STRING))) + END AS json_struct + FROM $tbl + """) + + case "to_json - nested struct" => + spark.sql(s""" + SELECT named_struct( + "outer", named_struct( + "inner_a", CAST(value AS INT), + "inner_b", concat("nested_", CAST(value AS STRING)) + ) + ) AS json_struct FROM $tbl + """) + case _ => spark.sql(s""" SELECT @@ -117,8 +156,9 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase { prepareTable(dir, jsonData) val extraConfigs = Map( + CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true", CometConf.getExprAllowIncompatConfigKey( - classOf[JsonToStructs]) -> "true") ++ config.extraCometConfigs + classOf[StructsToJson]) -> "true") ++ config.extraCometConfigs runExpressionBenchmark(config.name, values, config.query, extraConfigs) } @@ -127,6 +167,7 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase { // Configuration for all JSON expression benchmarks private val jsonExpressions = List( + // from_json tests JsonExprConfig( "from_json - simple primitives", "a INT, b STRING", @@ -146,7 +187,25 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase { JsonExprConfig( "from_json - field access", "a INT, b STRING", - "SELECT from_json(json_str, 'a INT, b STRING').a FROM parquetV1Table")) + "SELECT from_json(json_str, 'a INT, b STRING').a FROM parquetV1Table"), + + // to_json tests + JsonExprConfig( + "to_json - simple primitives", + "a INT, b STRING", + "SELECT to_json(json_struct) FROM parquetV1Table"), + JsonExprConfig( + "to_json - all primitive types", + "i32 INT, i64 BIGINT, f32 FLOAT, f64 DOUBLE, bool BOOLEAN, str STRING", + "SELECT to_json(json_struct) FROM parquetV1Table"), + JsonExprConfig( + "to_json - with nulls", + "a INT, b STRING", + "SELECT to_json(json_struct) FROM parquetV1Table"), + JsonExprConfig( + "to_json - nested struct", + "outer STRUCT", + "SELECT to_json(json_struct) FROM parquetV1Table")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024 From 16b0c6ccc05c7508a1a889da43f57c1d5d119f3a Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Mon, 29 Dec 2025 13:42:36 +0400 Subject: [PATCH 4/5] Add unit and benchmark tests for to_json --- .../spark/sql/benchmark/CometJsonExpressionBenchmark.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala index c877dbf150..5f1365bd76 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala @@ -43,9 +43,8 @@ case class JsonExprConfig( // spotless:off /** * Benchmark to measure performance of Comet JSON expressions. To run this benchmark: - * `SPARK_GENERATE_BENCHMARK_FILES=1 make - * benchmark-org.apache.spark.sql.benchmark.CometJsonExpressionBenchmark` Results will be written - * to "spark/benchmarks/CometJsonExpressionBenchmark-**results.txt". + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometJsonExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometJsonExpressionBenchmark-**results.txt". */ // spotless:on object CometJsonExpressionBenchmark extends CometBenchmarkBase { From 4110dd5ecd1077e7ba170590d785cbbc620b12a3 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Mon, 29 Dec 2025 20:47:48 +0400 Subject: [PATCH 5/5] Fix tests --- .../test/scala/org/apache/comet/CometJsonExpressionSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala index c2c1ce3663..64c330dbdd 100644 --- a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, StructsToJson} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ +import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus import org.apache.comet.serde.CometStructsToJson import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} @@ -47,6 +48,7 @@ class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe } test("to_json - all supported types") { + assume(!isSpark40Plus) withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") val filename = path.toString