Skip to content

Commit 95be2c3

Browse files
authored
chore: Add join benchmarks (#4598)
* add_benchmarks_native_joins
1 parent fdc965a commit 95be2c3

3 files changed

Lines changed: 379 additions & 0 deletions

File tree

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.benchmark
21+
22+
import org.apache.spark.SparkConf
23+
import org.apache.spark.sql.SparkSession
24+
import org.apache.spark.sql.internal.SQLConf
25+
26+
import org.apache.comet.CometSparkSessionExtensions
27+
28+
/**
29+
* Benchmark to measure performance of Comet's BroadcastHashJoin across common Spark join shapes
30+
* (Inner count, Inner projected, LEFT OUTER, LEFT SEMI, RIGHT OUTER). To run:
31+
* {{{
32+
* SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometBroadcastHashJoinBenchmark
33+
* }}}
34+
*/
35+
object CometBroadcastHashJoinBenchmark extends CometBenchmarkBase {
36+
37+
override def getSparkSession: SparkSession = {
38+
val conf = new SparkConf()
39+
.setAppName("CometBroadcastHashJoinBenchmark")
40+
.set("spark.master", "local[5]")
41+
.setIfMissing("spark.driver.memory", "3g")
42+
.setIfMissing("spark.executor.memory", "3g")
43+
.set(
44+
"spark.shuffle.manager",
45+
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
46+
47+
val sparkSession = SparkSession.builder
48+
.config(conf)
49+
.withExtensions(new CometSparkSessionExtensions)
50+
.getOrCreate()
51+
sparkSession.conf.set("spark.sql.shuffle.partitions", "2")
52+
sparkSession
53+
}
54+
55+
// Force BroadcastHashJoin: small build below the 10MB threshold.
56+
private val cometConfigs: Map[String, String] = Map(
57+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB",
58+
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB")
59+
60+
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
61+
val probeRows = 1024 * 1024
62+
val buildRows = 100000
63+
64+
withTempPath { dir =>
65+
withTempTable("probe", "build") {
66+
spark
67+
.range(probeRows)
68+
.selectExpr("id AS k", "id % 100 AS v")
69+
.write
70+
.parquet(s"${dir.getAbsolutePath}/probe")
71+
spark
72+
.range(buildRows)
73+
.selectExpr("id AS k", "id * 10 AS w")
74+
.write
75+
.parquet(s"${dir.getAbsolutePath}/build")
76+
77+
spark.read.parquet(s"${dir.getAbsolutePath}/probe").createOrReplaceTempView("probe")
78+
spark.read.parquet(s"${dir.getAbsolutePath}/build").createOrReplaceTempView("build")
79+
80+
runBenchmark("BroadcastHashJoin - inner count") {
81+
runExpressionBenchmark(
82+
"inner count",
83+
probeRows,
84+
"SELECT /*+ BROADCAST(b) */ count(*) FROM probe p JOIN build b ON p.k = b.k",
85+
cometConfigs)
86+
}
87+
88+
runBenchmark("BroadcastHashJoin - inner projected") {
89+
runExpressionBenchmark(
90+
"inner projected",
91+
probeRows,
92+
"SELECT /*+ BROADCAST(b) */ p.k, p.v, b.w FROM probe p JOIN build b ON p.k = b.k",
93+
cometConfigs)
94+
}
95+
96+
runBenchmark("BroadcastHashJoin - left outer") {
97+
runExpressionBenchmark(
98+
"left outer",
99+
probeRows,
100+
"SELECT /*+ BROADCAST(b) */ count(*) FROM probe p LEFT JOIN build b ON p.k = b.k",
101+
cometConfigs)
102+
}
103+
104+
runBenchmark("BroadcastHashJoin - left semi") {
105+
runExpressionBenchmark(
106+
"left semi",
107+
probeRows,
108+
"SELECT /*+ BROADCAST(b) */ count(*) FROM probe p LEFT SEMI JOIN build b ON p.k = b.k",
109+
cometConfigs)
110+
}
111+
112+
runBenchmark("BroadcastHashJoin - right outer") {
113+
runExpressionBenchmark(
114+
"right outer",
115+
probeRows,
116+
"SELECT /*+ BROADCAST(p) */ count(*) FROM probe p RIGHT JOIN build b ON p.k = b.k",
117+
cometConfigs)
118+
}
119+
}
120+
}
121+
}
122+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.benchmark
21+
22+
import org.apache.spark.SparkConf
23+
import org.apache.spark.sql.SparkSession
24+
import org.apache.spark.sql.internal.SQLConf
25+
26+
import org.apache.comet.{CometConf, CometSparkSessionExtensions}
27+
28+
/**
29+
* Benchmark to measure performance of Comet's ShuffledHashJoin across common Spark join shapes
30+
* (Inner count, Inner projected, LEFT OUTER, LEFT SEMI, RIGHT OUTER). To run:
31+
* {{{
32+
* SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometHashJoinBenchmark
33+
* }}}
34+
*/
35+
object CometHashJoinBenchmark extends CometBenchmarkBase {
36+
37+
override def getSparkSession: SparkSession = {
38+
val conf = new SparkConf()
39+
.setAppName("CometHashJoinBenchmark")
40+
.set("spark.master", "local[5]")
41+
.setIfMissing("spark.driver.memory", "3g")
42+
.setIfMissing("spark.executor.memory", "3g")
43+
.set(
44+
"spark.shuffle.manager",
45+
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
46+
47+
val sparkSession = SparkSession.builder
48+
.config(conf)
49+
.withExtensions(new CometSparkSessionExtensions)
50+
.getOrCreate()
51+
52+
sparkSession.conf.set(CometConf.COMET_ENABLED.key, "false")
53+
sparkSession.conf.set(CometConf.COMET_EXEC_ENABLED.key, "false")
54+
sparkSession.conf.set(SQLConf.ANSI_ENABLED.key, "false")
55+
sparkSession.conf.set("spark.sql.shuffle.partitions", "2")
56+
57+
sparkSession
58+
}
59+
60+
// Force ShuffledHashJoin: disable broadcast and prefer-SMJ, enable force-SHJ.
61+
private val cometConfigs: Map[String, String] = Map(
62+
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
63+
"spark.sql.join.forceApplyShuffledHashJoin" -> "true",
64+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
65+
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1")
66+
67+
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
68+
val probeRows = 1024 * 1024
69+
val buildRows = 100000
70+
71+
withTempPath { dir =>
72+
withTempTable("probe", "build") {
73+
spark
74+
.range(probeRows)
75+
.selectExpr("id AS k", "id % 100 AS v")
76+
.write
77+
.parquet(s"${dir.getAbsolutePath}/probe")
78+
spark
79+
.range(buildRows)
80+
.selectExpr("id AS k", "id * 10 AS w")
81+
.write
82+
.parquet(s"${dir.getAbsolutePath}/build")
83+
84+
spark.read.parquet(s"${dir.getAbsolutePath}/probe").createOrReplaceTempView("probe")
85+
spark.read.parquet(s"${dir.getAbsolutePath}/build").createOrReplaceTempView("build")
86+
87+
runBenchmark("ShuffledHashJoin - inner count") {
88+
runExpressionBenchmark(
89+
"inner count",
90+
probeRows,
91+
"SELECT /*+ SHUFFLE_HASH(b) */ count(*) FROM probe p JOIN build b ON p.k = b.k",
92+
cometConfigs)
93+
}
94+
95+
runBenchmark("ShuffledHashJoin - inner projected") {
96+
runExpressionBenchmark(
97+
"inner projected",
98+
probeRows,
99+
"SELECT /*+ SHUFFLE_HASH(b) */ p.k, p.v, b.w FROM probe p JOIN build b ON p.k = b.k",
100+
cometConfigs)
101+
}
102+
103+
runBenchmark("ShuffledHashJoin - left outer") {
104+
runExpressionBenchmark(
105+
"left outer",
106+
probeRows,
107+
"SELECT /*+ SHUFFLE_HASH(b) */ count(*) FROM probe p LEFT JOIN build b ON p.k = b.k",
108+
cometConfigs)
109+
}
110+
111+
runBenchmark("ShuffledHashJoin - left semi") {
112+
runExpressionBenchmark(
113+
"left semi",
114+
probeRows,
115+
"SELECT /*+ SHUFFLE_HASH(b) */ count(*) FROM probe p LEFT SEMI JOIN build b ON p.k = b.k",
116+
cometConfigs)
117+
}
118+
119+
runBenchmark("ShuffledHashJoin - right outer") {
120+
runExpressionBenchmark(
121+
"right outer",
122+
probeRows,
123+
"SELECT /*+ SHUFFLE_HASH(p) */ count(*) FROM probe p RIGHT JOIN build b ON p.k = b.k",
124+
cometConfigs)
125+
}
126+
}
127+
}
128+
}
129+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.benchmark
21+
22+
import org.apache.spark.SparkConf
23+
import org.apache.spark.sql.SparkSession
24+
import org.apache.spark.sql.internal.SQLConf
25+
26+
import org.apache.comet.{CometConf, CometSparkSessionExtensions}
27+
28+
/**
29+
* Benchmark to measure performance of Comet's SortMergeJoin across common Spark join shapes
30+
* (Inner count, Inner projected, LEFT OUTER, LEFT SEMI, RIGHT OUTER). To run:
31+
* {{{
32+
* SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometSortMergeJoinBenchmark
33+
* }}}
34+
*/
35+
object CometSortMergeJoinBenchmark extends CometBenchmarkBase {
36+
37+
override def getSparkSession: SparkSession = {
38+
val conf = new SparkConf()
39+
.setAppName("CometSortMergeJoinBenchmark")
40+
.set("spark.master", "local[5]")
41+
.setIfMissing("spark.driver.memory", "3g")
42+
.setIfMissing("spark.executor.memory", "3g")
43+
.set(
44+
"spark.shuffle.manager",
45+
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
46+
47+
val sparkSession = SparkSession.builder
48+
.config(conf)
49+
.withExtensions(new CometSparkSessionExtensions)
50+
.getOrCreate()
51+
52+
sparkSession.conf.set(CometConf.COMET_ENABLED.key, "false")
53+
sparkSession.conf.set(CometConf.COMET_EXEC_ENABLED.key, "false")
54+
sparkSession.conf.set(SQLConf.ANSI_ENABLED.key, "false")
55+
sparkSession.conf.set("spark.sql.shuffle.partitions", "2")
56+
57+
sparkSession
58+
}
59+
60+
// Force SortMergeJoin: prefer SMJ, disable broadcast.
61+
private val cometConfigs: Map[String, String] = Map(
62+
SQLConf.PREFER_SORTMERGEJOIN.key -> "true",
63+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
64+
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1")
65+
66+
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
67+
val probeRows = 1024 * 1024
68+
val buildRows = 100000
69+
70+
withTempPath { dir =>
71+
withTempTable("probe", "build") {
72+
spark
73+
.range(probeRows)
74+
.selectExpr("id AS k", "id % 100 AS v")
75+
.write
76+
.parquet(s"${dir.getAbsolutePath}/probe")
77+
spark
78+
.range(buildRows)
79+
.selectExpr("id AS k", "id * 10 AS w")
80+
.write
81+
.parquet(s"${dir.getAbsolutePath}/build")
82+
83+
spark.read.parquet(s"${dir.getAbsolutePath}/probe").createOrReplaceTempView("probe")
84+
spark.read.parquet(s"${dir.getAbsolutePath}/build").createOrReplaceTempView("build")
85+
86+
runBenchmark("SortMergeJoin - inner count") {
87+
runExpressionBenchmark(
88+
"inner count",
89+
probeRows,
90+
"SELECT /*+ MERGE(b) */ count(*) FROM probe p JOIN build b ON p.k = b.k",
91+
cometConfigs)
92+
}
93+
94+
runBenchmark("SortMergeJoin - inner projected") {
95+
runExpressionBenchmark(
96+
"inner projected",
97+
probeRows,
98+
"SELECT /*+ MERGE(b) */ p.k, p.v, b.w FROM probe p JOIN build b ON p.k = b.k",
99+
cometConfigs)
100+
}
101+
102+
runBenchmark("SortMergeJoin - left outer") {
103+
runExpressionBenchmark(
104+
"left outer",
105+
probeRows,
106+
"SELECT /*+ MERGE(b) */ count(*) FROM probe p LEFT JOIN build b ON p.k = b.k",
107+
cometConfigs)
108+
}
109+
110+
runBenchmark("SortMergeJoin - left semi") {
111+
runExpressionBenchmark(
112+
"left semi",
113+
probeRows,
114+
"SELECT /*+ MERGE(b) */ count(*) FROM probe p LEFT SEMI JOIN build b ON p.k = b.k",
115+
cometConfigs)
116+
}
117+
118+
runBenchmark("SortMergeJoin - right outer") {
119+
runExpressionBenchmark(
120+
"right outer",
121+
probeRows,
122+
"SELECT /*+ MERGE(p) */ count(*) FROM probe p RIGHT JOIN build b ON p.k = b.k",
123+
cometConfigs)
124+
}
125+
}
126+
}
127+
}
128+
}

0 commit comments

Comments
 (0)