Skip to content

Commit e1ebc09

Browse files
author
Kazantsev Maksim
committed
feat: create map
1 parent 9be3e51 commit e1ebc09

5 files changed

Lines changed: 112 additions & 6 deletions

File tree

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ These settings can be used to determine which parts of the plan are accelerated
232232
| `spark.comet.expression.Cosh.enabled` | Enable Comet acceleration for `Cosh` | true |
233233
| `spark.comet.expression.Cot.enabled` | Enable Comet acceleration for `Cot` | true |
234234
| `spark.comet.expression.CreateArray.enabled` | Enable Comet acceleration for `CreateArray` | true |
235+
| `spark.comet.expression.CreateMap.enabled` | Enable Comet acceleration for `CreateMap` | true |
235236
| `spark.comet.expression.CreateNamedStruct.enabled` | Enable Comet acceleration for `CreateNamedStruct` | true |
236237
| `spark.comet.expression.DateAdd.enabled` | Enable Comet acceleration for `DateAdd` | true |
237238
| `spark.comet.expression.DateFormatClass.enabled` | Enable Comet acceleration for `DateFormatClass` | true |

native/core/src/execution/jni_api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd;
4646
use datafusion_spark::function::datetime::date_sub::SparkDateSub;
4747
use datafusion_spark::function::hash::sha1::SparkSha1;
4848
use datafusion_spark::function::hash::sha2::SparkSha2;
49+
use datafusion_spark::function::map::map_from_arrays::MapFromArrays;
4950
use datafusion_spark::function::math::expm1::SparkExpm1;
5051
use datafusion_spark::function::math::hex::SparkHex;
5152
use datafusion_spark::function::string::char::CharFunc;
@@ -67,7 +68,6 @@ use std::collections::HashMap;
6768
use std::path::PathBuf;
6869
use std::time::{Duration, Instant};
6970
use std::{sync::Arc, task::Poll};
70-
use datafusion_spark::function::map::map_from_arrays::MapFromArrays;
7171
use tokio::runtime::Runtime;
7272

7373
use crate::execution::memory_pools::{

spark/src/main/scala/org/apache/comet/serde/maps.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,18 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] {
9494
}
9595
}
9696

97-
object CometCreateMap extends CometExpressionSerde[CreateMap] {
97+
object CometCreateMap extends CometExpressionSerde[CreateMap] with MapBase {
98+
val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in create map function"
99+
val valueUnsupportedReason =
100+
"Using BinaryType as Map values is not allowed in create map function"
98101

99102
override def getSupportLevel(expr: CreateMap): SupportLevel = {
103+
if (containsBinary(expr.dataType.keyType)) {
104+
return Incompatible(Some(keyUnsupportedReason))
105+
}
106+
if (containsBinary(expr.dataType.valueType)) {
107+
return Incompatible(Some(valueUnsupportedReason))
108+
}
100109
Compatible(None)
101110
}
102111

@@ -121,7 +130,7 @@ object CometCreateMap extends CometExpressionSerde[CreateMap] {
121130

122131
sealed trait MapBase {
123132

124-
def containsBinary(dataType: DataType): Boolean = {
133+
protected def containsBinary(dataType: DataType): Boolean = {
125134
dataType match {
126135
case BinaryType => true
127136
case StructType(fields) => fields.exists(field => containsBinary(field.dataType))

spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import org.apache.spark.sql.functions._
2727
import org.apache.spark.sql.internal.SQLConf
2828
import org.apache.spark.sql.types.BinaryType
2929

30-
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions}
30+
import org.apache.comet.serde.CometCreateMap
31+
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, ParquetGenerator, SchemaGenOptions}
3132

3233
class CometMapExpressionSuite extends CometTestBase {
3334

@@ -165,7 +166,10 @@ class CometMapExpressionSuite extends CometTestBase {
165166
val random = new Random(42)
166167
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
167168
val schemaGenOptions =
168-
SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false)
169+
SchemaGenOptions(
170+
generateArray = true,
171+
generateStruct = true,
172+
primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType))
169173
val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false)
170174
ParquetGenerator.makeParquetFile(
171175
random,
@@ -177,10 +181,23 @@ class CometMapExpressionSuite extends CometTestBase {
177181
}
178182
val df = spark.read.parquet(filename)
179183
df.createOrReplaceTempView("t1")
180-
for (fieldName <- df.schema.filter(_.dataType != BinaryType).map(_.name)) {
184+
for (fieldName <- df.schema.fieldNames) {
181185
checkSparkAnswerAndOperator(spark.sql(s"SELECT map($fieldName, $fieldName) FROM t1"))
182186
}
183187
}
184188
}
185189

190+
test("create_map - fallback for binary type") {
191+
val table = "t2"
192+
withTable(table) {
193+
sql(
194+
s"create table $table using parquet as select cast('abc' as binary) as c1 from range(10)")
195+
checkSparkAnswerAndFallbackReason(
196+
sql(s"select map(c1, 1) from $table"),
197+
CometCreateMap.keyUnsupportedReason)
198+
checkSparkAnswerAndFallbackReason(
199+
sql(s"select map(1, c1) from $table"),
200+
CometCreateMap.valueUnsupportedReason)
201+
}
202+
}
186203
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.benchmark
21+
22+
import org.apache.comet.CometConf
23+
24+
/**
25+
* Configuration for a map expression benchmark.
26+
*
27+
* @param name
28+
* Name for the benchmark
29+
* @param query
30+
* SQL query to benchmark
31+
* @param extraCometConfigs
32+
* Additional Comet configurations for the scan+exec case
33+
*/
34+
case class MapExprConfig(
35+
name: String,
36+
query: String,
37+
extraCometConfigs: Map[String, String] = Map.empty)
38+
39+
/**
40+
* Benchmark to measure performance of Comet map expressions. To run this benchmark:
41+
* {{{
42+
* SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometMapExpressionBenchmark
43+
* }}}
44+
* Results will be written to "spark/benchmarks/CometMapExpressionBenchmark-**results.txt".
45+
*/
46+
object CometMapExpressionBenchmark extends CometBenchmarkBase {
47+
48+
private val mapExpressions = List(
49+
MapExprConfig(
50+
"create_map",
51+
"select map(c1, c1, c2, c2, c3, c3, c4, c4, c5, c5) from parquetV1Table"))
52+
53+
override def runCometBenchmark(args: Array[String]): Unit = {
54+
runBenchmarkWithTable("Map expressions", 1024) { v =>
55+
withTempPath { dir =>
56+
withTempTable("parquetV1Table") {
57+
prepareTable(
58+
dir,
59+
spark.sql(
60+
s"SELECT " +
61+
s"(value + 0) AS C1, " +
62+
s"(value + 10) AS C2, " +
63+
s"(value + 20) AS C3, " +
64+
s"(value + 30) AS C4, " +
65+
s"(value + 40) AS C5 FROM $tbl"))
66+
67+
val extraConfigs = Map(CometConf.COMET_CASE_CONVERSION_ENABLED.key -> "true")
68+
69+
mapExpressions.foreach { config =>
70+
val allConfigs = extraConfigs ++ config.extraCometConfigs
71+
runBenchmark(config.name) {
72+
runExpressionBenchmark(config.name, v, config.query, allConfigs)
73+
}
74+
}
75+
}
76+
}
77+
}
78+
}
79+
}

0 commit comments

Comments
 (0)