From 768b3e90f261c7aea58bdb98dc698b90deeeae34 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 14 Dec 2025 16:24:01 +0400 Subject: [PATCH 1/9] impl map_from_entries --- native/core/src/execution/jni_api.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +++++++++++- .../comet/CometMapExpressionSuite.scala | 45 +++++++++++++++++++ 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a24d993059..4f53cea3e6 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,6 +46,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; +use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -337,6 +338,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 54df2f1688..a99cf3824b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,7 +125,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays) + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[MapFromEntries] -> CometMapFromEntries) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 2e217f6af0..498aa3594c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,9 +19,12 @@ package org.apache.comet.serde +import scala.annotation.tailrec + import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, MapType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -89,3 +92,27 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } + +object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { + val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" + val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" + + private def containsBinary(dataType: DataType): Boolean = { + dataType match { + case BinaryType => true + case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) + case ArrayType(elementType, _) => containsBinary(elementType) + case _ => false + } + } + + override def getSupportLevel(expr: MapFromEntries): SupportLevel = { + if (containsBinary(expr.dataType.keyType)) { + return Incompatible(Some(keyUnsupportedReason)) + } + if (containsBinary(expr.dataType.valueType)) { + return Incompatible(Some(valueUnsupportedReason)) + } + Compatible(None) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 88c13391a6..01b9744ed6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,7 +25,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.BinaryType +import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -125,4 +127,47 @@ class CometMapExpressionSuite extends CometTestBase { } } + test("map_from_entries") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val schemaGenOptions = + SchemaGenOptions( + generateArray = true, + generateStruct = true, + primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) + val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + schemaGenOptions, + dataGenOptions) + } + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + for (field <- df.schema.fieldNames) { + checkSparkAnswerAndOperator( + spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) + } + } + } + + test("map_from_entries - fallback for binary type") { + val table = "t2" + withTable(table) { + sql( + s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), + CometMapFromEntries.keyUnsupportedReason) + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(0, c1))) from $table"), + CometMapFromEntries.valueUnsupportedReason) + } + } + } From c68c3428676b5d991e7ba9e13464bf2ce1ec84e8 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 16 Dec 2025 16:10:43 +0400 Subject: [PATCH 2/9] Revert "impl map_from_entries" This reverts commit 768b3e90f261c7aea58bdb98dc698b90deeeae34. --- native/core/src/execution/jni_api.rs | 2 - .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +----------- .../comet/CometMapExpressionSuite.scala | 45 ------------------- 4 files changed, 2 insertions(+), 77 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 4f53cea3e6..a24d993059 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,7 +46,6 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; -use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -338,7 +337,6 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); - session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a99cf3824b..54df2f1688 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,8 +125,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays, - classOf[MapFromEntries] -> CometMapFromEntries) + classOf[MapFromArrays] -> CometMapFromArrays) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 498aa3594c..2e217f6af0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,12 +19,9 @@ package org.apache.comet.serde -import scala.annotation.tailrec - import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, MapType} -import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -92,27 +89,3 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } - -object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { - val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" - val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" - - private def containsBinary(dataType: DataType): Boolean = { - dataType match { - case BinaryType => true - case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) - case ArrayType(elementType, _) => containsBinary(elementType) - case _ => false - } - } - - override def getSupportLevel(expr: MapFromEntries): SupportLevel = { - if (containsBinary(expr.dataType.keyType)) { - return Incompatible(Some(keyUnsupportedReason)) - } - if (containsBinary(expr.dataType.valueType)) { - return Incompatible(Some(valueUnsupportedReason)) - } - Compatible(None) - } -} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 01b9744ed6..88c13391a6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,9 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.BinaryType -import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -127,47 +125,4 @@ class CometMapExpressionSuite extends CometTestBase { } } - test("map_from_entries") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val schemaGenOptions = - SchemaGenOptions( - generateArray = true, - generateStruct = true, - primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) - val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - schemaGenOptions, - dataGenOptions) - } - val df = spark.read.parquet(filename) - df.createOrReplaceTempView("t1") - for (field <- df.schema.fieldNames) { - checkSparkAnswerAndOperator( - spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) - } - } - } - - test("map_from_entries - fallback for binary type") { - val table = "t2" - withTable(table) { - sql( - s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), - CometMapFromEntries.keyUnsupportedReason) - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(0, c1))) from $table"), - CometMapFromEntries.valueUnsupportedReason) - } - } - } From 0ef353cb8ed938942dbc4a110a3dba2733cf2849 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Fri, 19 Dec 2025 19:13:58 +0400 Subject: [PATCH 3/9] work --- .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 28 ++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 54df2f1688..135546b0aa 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,7 +125,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays) + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[CreateMap] -> CometCreateMap) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 2e217f6af0..5a62678a7f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -22,7 +22,7 @@ package org.apache.comet.serde import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{ArrayType, MapType} -import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} +import org.apache.comet.serde.QueryPlanSerde.{exprToProto, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -89,3 +89,29 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } + +object CometCreateMap extends CometExpressionSerde[CreateMap] { + override def convert( + expr: CreateMap, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val keys = CreateArray(expr.keys) + val values = CreateArray(expr.values) + val keysProtoExpr = exprToProtoInternal(keys, inputs, binding) + val valuesProtoExpr = exprToProtoInternal(values, inputs, binding) + // scalastyle:off println + println(keysProtoExpr) + // scalastyle:on println line=102 column=4 + // scalastyle:off println + // println(valuesProtoExpr) + // scalastyle:on println line=103 column=4 + val createMapScalarExpr = + scalarFunctionExprToProtoWithReturnType( + "map", + expr.dataType, + false, + keysProtoExpr, + valuesProtoExpr) + optExprWithInfo(createMapScalarExpr, expr, expr.children: _*) + } +} From a3136edd360513bd6f4090659d55e650092a13ed Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Mon, 19 Jan 2026 21:54:32 +0400 Subject: [PATCH 4/9] work --- native/core/src/execution/planner.rs | 9 +++++ native/proto/src/proto/expr.proto | 6 +++ .../scala/org/apache/comet/serde/maps.scala | 40 ++++++++++--------- 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 67b2523be3..f795dd91ba 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -134,6 +134,7 @@ use num::{BigInt, ToPrimitive}; use object_store::path::Path; use std::cmp::max; use std::{collections::HashMap, sync::Arc}; +use datafusion_functions_nested::map::map; use url::Url; // For clippy error on type_complexity. @@ -676,6 +677,14 @@ impl PhysicalPlanner { ExprStruct::MonotonicallyIncreasingId(_) => Ok(Arc::new( MonotonicallyIncreasingId::from_partition_id(self.partition), )), + ExprStruct::CreateMap(expr) => { + let keys = expr.keys.iter().map(|expr| self.create_expr(expr, Arc::clone(&input_schema))) + .collect::>(); + let values = expr.values.iter().map(|expr| self.create_expr(expr, Arc::clone(&input_schema))) + .collect::, _>>()?; + let create_map = map(keys, values); + Ok(Arc::new(create_map)) + }, expr => Err(GeneralError(format!("Not implemented: {expr:?}"))), } } diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index a7736f561a..df575adc56 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -85,6 +85,7 @@ message Expr { Rand randn = 62; EmptyExpr spark_partition_id = 63; EmptyExpr monotonically_increasing_id = 64; + CreateMap create_map = 65; } } @@ -410,3 +411,8 @@ message ArrayJoin { message Rand { int64 seed = 1; } + +message CreateMap { + repeated Expr keys = 1; + repeated Expr values = 2; +} diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 5a62678a7f..4c31b3c925 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,10 +19,13 @@ package org.apache.comet.serde +import scala.jdk.CollectionConverters._ + import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{ArrayType, MapType} -import org.apache.comet.serde.QueryPlanSerde.{exprToProto, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} +import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -95,23 +98,22 @@ object CometCreateMap extends CometExpressionSerde[CreateMap] { expr: CreateMap, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - val keys = CreateArray(expr.keys) - val values = CreateArray(expr.values) - val keysProtoExpr = exprToProtoInternal(keys, inputs, binding) - val valuesProtoExpr = exprToProtoInternal(values, inputs, binding) - // scalastyle:off println - println(keysProtoExpr) - // scalastyle:on println line=102 column=4 - // scalastyle:off println - // println(valuesProtoExpr) - // scalastyle:on println line=103 column=4 - val createMapScalarExpr = - scalarFunctionExprToProtoWithReturnType( - "map", - expr.dataType, - false, - keysProtoExpr, - valuesProtoExpr) - optExprWithInfo(createMapScalarExpr, expr, expr.children: _*) + val keysProtoExpr = expr.keys.map(exprToProtoInternal(_, inputs, binding)) + val valuesProtoExpr = expr.values.map(exprToProtoInternal(_, inputs, binding)) + if (keysProtoExpr.forall(_.isDefined) && valuesProtoExpr.forall(_.isDefined)) { + val createMapProtoExpr = ExprOuterClass.CreateMap + .newBuilder() + .addAllValues(keysProtoExpr.map(_.get).asJava) + .addAllValues(valuesProtoExpr.map(_.get).asJava) + .build() + Some( + ExprOuterClass.Expr + .newBuilder() + .setCreateMap(createMapProtoExpr) + .build()) + } else { + withInfo(expr, expr.children: _*) + None + } } } From c0d5c9114129fa709654765a096b6f21cc48a83e Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 20 Jan 2026 18:00:12 +0400 Subject: [PATCH 5/9] work --- native/core/src/execution/jni_api.rs | 2 + native/core/src/execution/planner.rs | 9 ---- .../scala/org/apache/comet/serde/maps.scala | 49 ++++++++++++------- .../comet/CometMapExpressionSuite.scala | 26 ++++++++++ 4 files changed, 59 insertions(+), 27 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index aaed1fc74f..45e045ee33 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -67,6 +67,7 @@ use std::collections::HashMap; use std::path::PathBuf; use std::time::{Duration, Instant}; use std::{sync::Arc, task::Poll}; +use datafusion_spark::function::map::map_from_arrays::MapFromArrays; use tokio::runtime::Runtime; use crate::execution::memory_pools::{ @@ -339,6 +340,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkHex::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromArrays::default())); } /// Prepares arrow arrays for output. diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index f795dd91ba..67b2523be3 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -134,7 +134,6 @@ use num::{BigInt, ToPrimitive}; use object_store::path::Path; use std::cmp::max; use std::{collections::HashMap, sync::Arc}; -use datafusion_functions_nested::map::map; use url::Url; // For clippy error on type_complexity. @@ -677,14 +676,6 @@ impl PhysicalPlanner { ExprStruct::MonotonicallyIncreasingId(_) => Ok(Arc::new( MonotonicallyIncreasingId::from_partition_id(self.partition), )), - ExprStruct::CreateMap(expr) => { - let keys = expr.keys.iter().map(|expr| self.create_expr(expr, Arc::clone(&input_schema))) - .collect::>(); - let values = expr.values.iter().map(|expr| self.create_expr(expr, Arc::clone(&input_schema))) - .collect::, _>>()?; - let create_map = map(keys, values); - Ok(Arc::new(create_map)) - }, expr => Err(GeneralError(format!("Not implemented: {expr:?}"))), } } diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 4c31b3c925..a8fbd32847 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -22,10 +22,11 @@ package org.apache.comet.serde import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, MapType} +import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, MapType, StructType} +import org.apache.spark.sql.types.DataTypes.{BinaryType, BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampNTZType, TimestampType} import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} +import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType, serializeDataType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -94,26 +95,38 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { } object CometCreateMap extends CometExpressionSerde[CreateMap] { + + override def getSupportLevel(expr: CreateMap): SupportLevel = { + Compatible(None) + } + override def convert( expr: CreateMap, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - val keysProtoExpr = expr.keys.map(exprToProtoInternal(_, inputs, binding)) - val valuesProtoExpr = expr.values.map(exprToProtoInternal(_, inputs, binding)) - if (keysProtoExpr.forall(_.isDefined) && valuesProtoExpr.forall(_.isDefined)) { - val createMapProtoExpr = ExprOuterClass.CreateMap - .newBuilder() - .addAllValues(keysProtoExpr.map(_.get).asJava) - .addAllValues(valuesProtoExpr.map(_.get).asJava) - .build() - Some( - ExprOuterClass.Expr - .newBuilder() - .setCreateMap(createMapProtoExpr) - .build()) - } else { - withInfo(expr, expr.children: _*) - None + val keysArray = CreateArray(expr.keys) + val valuesArray = CreateArray(expr.values) + val keysExprProto = exprToProtoInternal(keysArray, inputs, binding) + val valuesExprProto = exprToProtoInternal(valuesArray, inputs, binding) + val createMapExprProto = + scalarFunctionExprToProtoWithReturnType( + "map_from_arrays", + expr.dataType, + false, + keysExprProto, + valuesExprProto) + optExprWithInfo(createMapExprProto, expr, expr.children: _*) + } +} + +sealed trait MapBase { + + def containsBinary(dataType: DataType): Boolean = { + dataType match { + case BinaryType => true + case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) + case ArrayType(elementType, _) => containsBinary(elementType) + case _ => false } } } diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 88c13391a6..f428f9fe9d 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.BinaryType import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} @@ -125,4 +126,29 @@ class CometMapExpressionSuite extends CometTestBase { } } + test("create_map") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val schemaGenOptions = + SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false) + val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + schemaGenOptions, + dataGenOptions) + } + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + for (fieldName <- df.schema.filter(_.dataType != BinaryType).map(_.name)) { + checkSparkAnswerAndOperator(spark.sql(s"SELECT map($fieldName, $fieldName) FROM t1")) + } + } + } + } From e1ebc09aa67fde9d6c59f551c7bfee4f92447827 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 20 Jan 2026 22:11:35 +0400 Subject: [PATCH 6/9] feat: create map --- docs/source/user-guide/latest/configs.md | 1 + native/core/src/execution/jni_api.rs | 2 +- .../scala/org/apache/comet/serde/maps.scala | 13 ++- .../comet/CometMapExpressionSuite.scala | 23 +++++- .../CometMapExpressionBenchmark.scala | 79 +++++++++++++++++++ 5 files changed, 112 insertions(+), 6 deletions(-) create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 5eea5c4e5d..529ea423d0 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -232,6 +232,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.expression.Cosh.enabled` | Enable Comet acceleration for `Cosh` | true | | `spark.comet.expression.Cot.enabled` | Enable Comet acceleration for `Cot` | true | | `spark.comet.expression.CreateArray.enabled` | Enable Comet acceleration for `CreateArray` | true | +| `spark.comet.expression.CreateMap.enabled` | Enable Comet acceleration for `CreateMap` | true | | `spark.comet.expression.CreateNamedStruct.enabled` | Enable Comet acceleration for `CreateNamedStruct` | true | | `spark.comet.expression.DateAdd.enabled` | Enable Comet acceleration for `DateAdd` | true | | `spark.comet.expression.DateFormatClass.enabled` | Enable Comet acceleration for `DateFormatClass` | true | diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index bd175109cf..8704d49f00 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,6 +46,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; +use datafusion_spark::function::map::map_from_arrays::MapFromArrays; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::math::hex::SparkHex; use datafusion_spark::function::string::char::CharFunc; @@ -67,7 +68,6 @@ use std::collections::HashMap; use std::path::PathBuf; use std::time::{Duration, Instant}; use std::{sync::Arc, task::Poll}; -use datafusion_spark::function::map::map_from_arrays::MapFromArrays; use tokio::runtime::Runtime; use crate::execution::memory_pools::{ diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index a8fbd32847..79c3e5f8c5 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -94,9 +94,18 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { } } -object CometCreateMap extends CometExpressionSerde[CreateMap] { +object CometCreateMap extends CometExpressionSerde[CreateMap] with MapBase { + val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in create map function" + val valueUnsupportedReason = + "Using BinaryType as Map values is not allowed in create map function" override def getSupportLevel(expr: CreateMap): SupportLevel = { + if (containsBinary(expr.dataType.keyType)) { + return Incompatible(Some(keyUnsupportedReason)) + } + if (containsBinary(expr.dataType.valueType)) { + return Incompatible(Some(valueUnsupportedReason)) + } Compatible(None) } @@ -121,7 +130,7 @@ object CometCreateMap extends CometExpressionSerde[CreateMap] { sealed trait MapBase { - def containsBinary(dataType: DataType): Boolean = { + protected def containsBinary(dataType: DataType): Boolean = { dataType match { case BinaryType => true case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 8f00246c88..a676479210 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -27,7 +27,8 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.BinaryType -import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} +import org.apache.comet.serde.CometCreateMap +import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -165,7 +166,10 @@ class CometMapExpressionSuite extends CometTestBase { val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { val schemaGenOptions = - SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false) + SchemaGenOptions( + generateArray = true, + generateStruct = true, + primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) ParquetGenerator.makeParquetFile( random, @@ -177,10 +181,23 @@ class CometMapExpressionSuite extends CometTestBase { } val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") - for (fieldName <- df.schema.filter(_.dataType != BinaryType).map(_.name)) { + for (fieldName <- df.schema.fieldNames) { checkSparkAnswerAndOperator(spark.sql(s"SELECT map($fieldName, $fieldName) FROM t1")) } } } + test("create_map - fallback for binary type") { + val table = "t2" + withTable(table) { + sql( + s"create table $table using parquet as select cast('abc' as binary) as c1 from range(10)") + checkSparkAnswerAndFallbackReason( + sql(s"select map(c1, 1) from $table"), + CometCreateMap.keyUnsupportedReason) + checkSparkAnswerAndFallbackReason( + sql(s"select map(1, c1) from $table"), + CometCreateMap.valueUnsupportedReason) + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala new file mode 100644 index 0000000000..bae89219da --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import org.apache.comet.CometConf + +/** + * Configuration for a map expression benchmark. + * + * @param name + * Name for the benchmark + * @param query + * SQL query to benchmark + * @param extraCometConfigs + * Additional Comet configurations for the scan+exec case + */ +case class MapExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +/** + * Benchmark to measure performance of Comet map expressions. To run this benchmark: + * {{{ + * SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometMapExpressionBenchmark + * }}} + * Results will be written to "spark/benchmarks/CometMapExpressionBenchmark-**results.txt". + */ +object CometMapExpressionBenchmark extends CometBenchmarkBase { + + private val mapExpressions = List( + MapExprConfig( + "create_map", + "select map(c1, c1, c2, c2, c3, c3, c4, c4, c5, c5) from parquetV1Table")) + + override def runCometBenchmark(args: Array[String]): Unit = { + runBenchmarkWithTable("Map expressions", 1024) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql( + s"SELECT " + + s"(value + 0) AS C1, " + + s"(value + 10) AS C2, " + + s"(value + 20) AS C3, " + + s"(value + 30) AS C4, " + + s"(value + 40) AS C5 FROM $tbl")) + + val extraConfigs = Map(CometConf.COMET_CASE_CONVERSION_ENABLED.key -> "true") + + mapExpressions.foreach { config => + val allConfigs = extraConfigs ++ config.extraCometConfigs + runBenchmark(config.name) { + runExpressionBenchmark(config.name, v, config.query, allConfigs) + } + } + } + } + } + } +} From 74c91a445a63c57e78524b64198be0828fdc8921 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Thu, 22 Jan 2026 21:13:25 +0400 Subject: [PATCH 7/9] Fix fmt --- .../sql/benchmark/CometMapExpressionBenchmark.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala index bae89219da..e0a16d3295 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala @@ -57,11 +57,11 @@ object CometMapExpressionBenchmark extends CometBenchmarkBase { prepareTable( dir, spark.sql( - s"SELECT " + - s"(value + 0) AS C1, " + - s"(value + 10) AS C2, " + - s"(value + 20) AS C3, " + - s"(value + 30) AS C4, " + + "SELECT " + + "(value + 0) AS C1, " + + "(value + 10) AS C2, " + + "(value + 20) AS C3, " + + "(value + 30) AS C4, " + s"(value + 40) AS C5 FROM $tbl")) val extraConfigs = Map(CometConf.COMET_CASE_CONVERSION_ENABLED.key -> "true") From 480582c92568ca278a143b48798997057f20a2fb Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Thu, 22 Jan 2026 21:14:32 +0400 Subject: [PATCH 8/9] Fix fmt --- .../test/scala/org/apache/comet/CometMapExpressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index a676479210..675dde2f6e 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.BinaryType import org.apache.comet.serde.CometCreateMap -import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, ParquetGenerator, SchemaGenOptions} +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { From 7da3deea6e2f297172fe002d3425591da62005e5 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Thu, 22 Jan 2026 21:14:57 +0400 Subject: [PATCH 9/9] Fix fmt --- spark/src/main/scala/org/apache/comet/serde/maps.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 79c3e5f8c5..e7ec1f09a8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,14 +19,11 @@ package org.apache.comet.serde -import scala.jdk.CollectionConverters._ - import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, MapType, StructType} -import org.apache.spark.sql.types.DataTypes.{BinaryType, BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampNTZType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} +import org.apache.spark.sql.types.DataTypes.BinaryType -import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType, serializeDataType} +import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] {