Skip to content

Commit ecbaa7f

Browse files
authored
Add ANSI support to aggregate microbenchmarks (apache#2901)
1 parent 9095c86 commit ecbaa7f

1 file changed

Lines changed: 102 additions & 65 deletions

File tree

spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala

Lines changed: 102 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
package org.apache.spark.sql.benchmark
2121

22+
import scala.util.Try
23+
2224
import org.apache.spark.benchmark.Benchmark
2325
import org.apache.spark.sql.SparkSession
26+
import org.apache.spark.sql.internal.SQLConf
2427
import org.apache.spark.sql.types.DecimalType
2528

2629
import org.apache.comet.CometConf
@@ -52,7 +55,8 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
5255
BenchAggregateFunction("MIN"),
5356
BenchAggregateFunction("MAX"),
5457
BenchAggregateFunction("COUNT"),
55-
BenchAggregateFunction("COUNT", distinct = true))
58+
BenchAggregateFunction("COUNT", distinct = true),
59+
BenchAggregateFunction("AVG"))
5660

5761
def aggFunctionSQL(aggregateFunction: BenchAggregateFunction, input: String): String = {
5862
s"${aggregateFunction.name}(${if (aggregateFunction.distinct) s"DISTINCT $input" else input})"
@@ -61,11 +65,12 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
6165
def singleGroupAndAggregate(
6266
values: Int,
6367
groupingKeyCardinality: Int,
64-
aggregateFunction: BenchAggregateFunction): Unit = {
68+
aggregateFunction: BenchAggregateFunction,
69+
isAnsiMode: Boolean): Unit = {
6570
val benchmark =
6671
new Benchmark(
6772
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " +
68-
s"single aggregate ${aggregateFunction.toString}",
73+
s"single aggregate ${aggregateFunction.toString}, ansi mode enabled : ${isAnsiMode}",
6974
values,
7075
output = output)
7176

@@ -78,16 +83,23 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
7883
val functionSQL = aggFunctionSQL(aggregateFunction, "value")
7984
val query = s"SELECT key, $functionSQL FROM parquetV1Table GROUP BY key"
8085

81-
benchmark.addCase(s"SQL Parquet - Spark (${aggregateFunction.toString})") { _ =>
82-
spark.sql(query).noop()
86+
benchmark.addCase(
87+
s"SQL Parquet - Spark (${aggregateFunction.toString}) ansi mode enabled : ${isAnsiMode}") {
88+
_ =>
89+
withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
90+
Try { spark.sql(query).noop() }
91+
}
8392
}
8493

85-
benchmark.addCase(s"SQL Parquet - Comet (${aggregateFunction.toString})") { _ =>
86-
withSQLConf(
87-
CometConf.COMET_ENABLED.key -> "true",
88-
CometConf.COMET_EXEC_ENABLED.key -> "true") {
89-
spark.sql(query).noop()
90-
}
94+
benchmark.addCase(
95+
s"SQL Parquet - Comet (${aggregateFunction.toString}) ansi mode enabled : ${isAnsiMode}") {
96+
_ =>
97+
withSQLConf(
98+
CometConf.COMET_ENABLED.key -> "true",
99+
CometConf.COMET_EXEC_ENABLED.key -> "true",
100+
SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
101+
Try { spark.sql(query).noop() }
102+
}
91103
}
92104

93105
benchmark.run()
@@ -99,7 +111,8 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
99111
values: Int,
100112
dataType: DecimalType,
101113
groupingKeyCardinality: Int,
102-
aggregateFunction: BenchAggregateFunction): Unit = {
114+
aggregateFunction: BenchAggregateFunction,
115+
isAnsiMode: Boolean): Unit = {
103116
val benchmark =
104117
new Benchmark(
105118
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " +
@@ -120,16 +133,23 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
120133
val functionSQL = aggFunctionSQL(aggregateFunction, "value")
121134
val query = s"SELECT key, $functionSQL FROM parquetV1Table GROUP BY key"
122135

123-
benchmark.addCase(s"SQL Parquet - Spark (${aggregateFunction.toString})") { _ =>
124-
spark.sql(query).noop()
136+
withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
137+
benchmark.addCase(
138+
s"SQL Parquet - Spark (${aggregateFunction.toString}) ansi mode enabled : ${isAnsiMode}") {
139+
_ =>
140+
Try { spark.sql(query).noop() }
141+
}
125142
}
126143

127-
benchmark.addCase(s"SQL Parquet - Comet (${aggregateFunction.toString})") { _ =>
128-
withSQLConf(
129-
CometConf.COMET_ENABLED.key -> "true",
130-
CometConf.COMET_EXEC_ENABLED.key -> "true") {
131-
spark.sql(query).noop()
132-
}
144+
benchmark.addCase(
145+
s"SQL Parquet - Comet (${aggregateFunction.toString}) ansi mode enabled : ${isAnsiMode}") {
146+
_ =>
147+
withSQLConf(
148+
CometConf.COMET_ENABLED.key -> "true",
149+
CometConf.COMET_EXEC_ENABLED.key -> "true",
150+
SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
151+
Try { spark.sql(query).noop() }
152+
}
133153
}
134154

135155
benchmark.run()
@@ -140,7 +160,8 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
140160
def multiGroupKeys(
141161
values: Int,
142162
groupingKeyCard: Int,
143-
aggregateFunction: BenchAggregateFunction): Unit = {
163+
aggregateFunction: BenchAggregateFunction,
164+
isAnsiMode: Boolean): Unit = {
144165
val benchmark =
145166
new Benchmark(
146167
s"Grouped HashAgg Exec: multiple group keys (cardinality $groupingKeyCard), " +
@@ -160,17 +181,24 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
160181
val query =
161182
s"SELECT key1, key2, $functionSQL FROM parquetV1Table GROUP BY key1, key2"
162183

163-
benchmark.addCase(s"SQL Parquet - Spark (${aggregateFunction.toString})") { _ =>
164-
spark.sql(query).noop()
184+
benchmark.addCase(
185+
s"SQL Parquet - Spark (${aggregateFunction.toString}) isANSIMode: ${isAnsiMode.toString}") {
186+
_ =>
187+
withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
188+
Try { spark.sql(query).noop() }
189+
}
165190
}
166191

167-
benchmark.addCase(s"SQL Parquet - Comet (${aggregateFunction.toString})") { _ =>
168-
withSQLConf(
169-
CometConf.COMET_ENABLED.key -> "true",
170-
CometConf.COMET_EXEC_ENABLED.key -> "true",
171-
CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key -> "1G") {
172-
spark.sql(query).noop()
173-
}
192+
benchmark.addCase(
193+
s"SQL Parquet - Comet (${aggregateFunction.toString}) isANSIMode: ${isAnsiMode.toString}") {
194+
_ =>
195+
withSQLConf(
196+
CometConf.COMET_ENABLED.key -> "true",
197+
CometConf.COMET_EXEC_ENABLED.key -> "true",
198+
CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key -> "1G",
199+
SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
200+
Try { spark.sql(query).noop() }
201+
}
174202
}
175203

176204
benchmark.run()
@@ -181,11 +209,12 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
181209
def multiAggregates(
182210
values: Int,
183211
groupingKeyCard: Int,
184-
aggregateFunction: BenchAggregateFunction): Unit = {
212+
aggregateFunction: BenchAggregateFunction,
213+
isAnsiMode: Boolean): Unit = {
185214
val benchmark =
186215
new Benchmark(
187216
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCard), " +
188-
s"multiple aggregates ${aggregateFunction.toString}",
217+
s"multiple aggregates ${aggregateFunction.toString} isANSIMode: ${isAnsiMode.toString}",
189218
values,
190219
output = output)
191220

@@ -203,16 +232,23 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
203232
val query = s"SELECT key, $functionSQL1, $functionSQL2 " +
204233
"FROM parquetV1Table GROUP BY key"
205234

206-
benchmark.addCase(s"SQL Parquet - Spark (${aggregateFunction.toString})") { _ =>
207-
spark.sql(query).noop()
235+
benchmark.addCase(
236+
s"SQL Parquet - Spark (${aggregateFunction.toString}) isANSIMode: ${isAnsiMode.toString}") {
237+
_ =>
238+
withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
239+
Try { spark.sql(query).noop() }
240+
}
208241
}
209242

210-
benchmark.addCase(s"SQL Parquet - Comet (${aggregateFunction.toString})") { _ =>
211-
withSQLConf(
212-
CometConf.COMET_ENABLED.key -> "true",
213-
CometConf.COMET_EXEC_ENABLED.key -> "true") {
214-
spark.sql(query).noop()
215-
}
243+
benchmark.addCase(
244+
s"SQL Parquet - Comet (${aggregateFunction.toString}) isANSIMode: ${isAnsiMode.toString}") {
245+
_ =>
246+
withSQLConf(
247+
CometConf.COMET_ENABLED.key -> "true",
248+
CometConf.COMET_EXEC_ENABLED.key -> "true",
249+
SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
250+
Try { spark.sql(query).noop() }
251+
}
216252
}
217253

218254
benchmark.run()
@@ -223,39 +259,40 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
223259
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
224260
val total = 1024 * 1024 * 10
225261
val combinations = List(100, 1024, 1024 * 1024) // number of distinct groups
226-
227262
benchmarkAggFuncs.foreach { aggFunc =>
228-
runBenchmarkWithTable(
229-
s"Grouped Aggregate (single group key + single aggregate $aggFunc)",
230-
total) { v =>
231-
for (card <- combinations) {
232-
singleGroupAndAggregate(v, card, aggFunc)
263+
Seq(true, false).foreach(k => {
264+
runBenchmarkWithTable(
265+
s"Grouped Aggregate (single group key + single aggregate $aggFunc)",
266+
total) { v =>
267+
for (card <- combinations) {
268+
singleGroupAndAggregate(v, card, aggFunc, k)
269+
}
233270
}
234-
}
235271

236-
runBenchmarkWithTable(
237-
s"Grouped Aggregate (multiple group keys + single aggregate $aggFunc)",
238-
total) { v =>
239-
for (card <- combinations) {
240-
multiGroupKeys(v, card, aggFunc)
272+
runBenchmarkWithTable(
273+
s"Grouped Aggregate (multiple group keys + single aggregate $aggFunc)",
274+
total) { v =>
275+
for (card <- combinations) {
276+
multiGroupKeys(v, card, aggFunc, k)
277+
}
241278
}
242-
}
243279

244-
runBenchmarkWithTable(
245-
s"Grouped Aggregate (single group key + multiple aggregates $aggFunc)",
246-
total) { v =>
247-
for (card <- combinations) {
248-
multiAggregates(v, card, aggFunc)
280+
runBenchmarkWithTable(
281+
s"Grouped Aggregate (single group key + multiple aggregates $aggFunc)",
282+
total) { v =>
283+
for (card <- combinations) {
284+
multiAggregates(v, card, aggFunc, k)
285+
}
249286
}
250-
}
251287

252-
runBenchmarkWithTable(
253-
s"Grouped Aggregate (single group key + single aggregate $aggFunc on decimal)",
254-
total) { v =>
255-
for (card <- combinations) {
256-
singleGroupAndAggregateDecimal(v, DecimalType(18, 10), card, aggFunc)
288+
runBenchmarkWithTable(
289+
s"Grouped Aggregate (single group key + single aggregate $aggFunc on decimal)",
290+
total) { v =>
291+
for (card <- combinations) {
292+
singleGroupAndAggregateDecimal(v, DecimalType(18, 10), card, aggFunc, k)
293+
}
257294
}
258-
}
295+
})
259296
}
260297
}
261298
}

0 commit comments

Comments
 (0)