Skip to content

Commit 69fd2bb

Browse files
author
Bhargava Vadlamani
committed
add_benches_rebase_main
1 parent ad30bd5 commit 69fd2bb

2 files changed

Lines changed: 189 additions & 5 deletions

File tree

spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ class CometJoinSuite extends CometTestBase {
3838
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
3939
pos: Position): Unit = {
4040
super.test(testName, testTags: _*) {
41-
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
41+
withSQLConf(
42+
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
43+
CometConf.COMET_EXEC_BROADCAST_NESTED_LOOP_JOIN_ENABLED.key -> "true",
44+
CometConf.getOperatorAllowIncompatConfigKey("BroadcastNestedLoopJoinExec") -> "true") {
4245
testFun
4346
}
4447
}
@@ -704,10 +707,7 @@ class CometJoinSuite extends CometTestBase {
704707
}
705708

706709
test("BroadcastNestedLoopJoin with unequal filter") {
707-
withSQLConf(
708-
CometConf.COMET_EXEC_BROADCAST_NESTED_LOOP_JOIN_ENABLED.key -> "true",
709-
CometConf.getOperatorAllowIncompatConfigKey("BroadcastNestedLoopJoinExec") -> "true",
710-
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
710+
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
711711
withParquetTable((0 until 100).map(i => (i, i % 5)), "tbl_a") {
712712
withParquetTable((0 until 10).map(i => (i, i + 5)), "tbl_b") {
713713
val df =
@@ -719,4 +719,74 @@ class CometJoinSuite extends CometTestBase {
719719
}
720720
}
721721
}
722+
723+
test("BroadcastNestedLoopJoin cross join with count-only output") {
724+
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
725+
withParquetTable((0 until 100).map(i => (i, i % 5)), "tbl_a") {
726+
withParquetTable((0 until 5).map(i => (i, s"w_$i")), "tbl_b") {
727+
val df = sql("SELECT /*+ BROADCAST(tbl_b) */ count(*) FROM tbl_a, tbl_b")
728+
checkSparkAnswerAndOperator(
729+
df,
730+
Seq(classOf[CometBroadcastExchangeExec], classOf[CometBroadcastNestedLoopJoinExec]))
731+
}
732+
}
733+
}
734+
}
735+
736+
test("BroadcastNestedLoopJoin LEFT OUTER with inequality") {
737+
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
738+
withParquetTable((0 until 100).map(i => (i, i % 5)), "tbl_a") {
739+
withParquetTable((0 until 10).map(i => (i, i + 5)), "tbl_b") {
740+
val df =
741+
sql(
742+
"SELECT /*+ BROADCAST(tbl_b) */ * FROM tbl_a LEFT JOIN tbl_b ON tbl_a._1 > tbl_b._1")
743+
checkSparkAnswerAndOperator(
744+
df,
745+
Seq(classOf[CometBroadcastExchangeExec], classOf[CometBroadcastNestedLoopJoinExec]))
746+
}
747+
}
748+
}
749+
}
750+
751+
test("BroadcastNestedLoopJoin RIGHT OUTER with inequality") {
752+
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
753+
withParquetTable((0 until 100).map(i => (i, i % 5)), "tbl_a") {
754+
withParquetTable((0 until 10).map(i => (i, i + 5)), "tbl_b") {
755+
val df =
756+
sql("SELECT /*+ BROADCAST(tbl_b) */ * FROM tbl_a RIGHT JOIN tbl_b ON tbl_a._1 > tbl_b._1")
757+
checkSparkAnswerAndOperator(
758+
df,
759+
Seq(classOf[CometBroadcastExchangeExec], classOf[CometBroadcastNestedLoopJoinExec]))
760+
}
761+
}
762+
}
763+
}
764+
765+
test("BroadcastNestedLoopJoin LEFT SEMI with inequality") {
766+
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
767+
withParquetTable((0 until 100).map(i => (i, i % 5)), "tbl_a") {
768+
withParquetTable((0 until 10).map(i => (i, i + 5)), "tbl_b") {
769+
val df =
770+
sql("SELECT /*+ BROADCAST(tbl_b) */ * FROM tbl_a LEFT SEMI JOIN tbl_b ON tbl_a._1 > tbl_b._1")
771+
checkSparkAnswerAndOperator(
772+
df,
773+
Seq(classOf[CometBroadcastExchangeExec], classOf[CometBroadcastNestedLoopJoinExec]))
774+
}
775+
}
776+
}
777+
}
778+
779+
test("BroadcastNestedLoopJoin LEFT ANTI with inequality") {
780+
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
781+
withParquetTable((0 until 100).map(i => (i, i % 5)), "tbl_a") {
782+
withParquetTable((0 until 10).map(i => (i, i + 5)), "tbl_b") {
783+
val df =
784+
sql("SELECT /*+ BROADCAST(tbl_b) */ * FROM tbl_a LEFT ANTI JOIN tbl_b ON tbl_a._1 > tbl_b._1")
785+
checkSparkAnswerAndOperator(
786+
df,
787+
Seq(classOf[CometBroadcastExchangeExec], classOf[CometBroadcastNestedLoopJoinExec]))
788+
}
789+
}
790+
}
791+
}
722792
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.benchmark
21+
22+
import org.apache.spark.SparkConf
23+
import org.apache.spark.sql.SparkSession
24+
import org.apache.spark.sql.internal.SQLConf
25+
26+
import org.apache.comet.{CometConf, CometSparkSessionExtensions}
27+
28+
/**
29+
* Benchmark to measure performance of Comet BroadcastNestedLoopJoin. To run this benchmark:
30+
* {{{
31+
* SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometBroadcastNestedLoopJoinBenchmark
32+
* }}}
33+
* Results will be written to
34+
* "spark/benchmarks/CometBroadcastNestedLoopJoinBenchmark-**results.txt".
35+
*/
36+
object CometBroadcastNestedLoopJoinBenchmark extends CometBenchmarkBase {
37+
38+
override def getSparkSession: SparkSession = {
39+
val conf = new SparkConf()
40+
.setAppName("CometBroadcastNestedLoopJoinBenchmark")
41+
.set("spark.master", "local[5]")
42+
.setIfMissing("spark.driver.memory", "3g")
43+
.setIfMissing("spark.executor.memory", "3g")
44+
.set(
45+
"spark.shuffle.manager",
46+
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
47+
48+
val sparkSession = SparkSession.builder
49+
.config(conf)
50+
.withExtensions(new CometSparkSessionExtensions)
51+
.getOrCreate()
52+
53+
sparkSession.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true")
54+
sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
55+
sparkSession.conf.set(CometConf.COMET_ENABLED.key, "false")
56+
sparkSession.conf.set(CometConf.COMET_EXEC_ENABLED.key, "false")
57+
sparkSession.conf.set(SQLConf.ANSI_ENABLED.key, "false")
58+
sparkSession.conf.set("spark.sql.shuffle.partitions", "2")
59+
60+
sparkSession
61+
}
62+
63+
private val cometConfigs: Map[String, String] = Map.empty[String, String]
64+
65+
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
66+
val probeRows = 1024 * 1024
67+
val buildRows = 100
68+
69+
withTempPath { dir =>
70+
withTempTable("probe", "build") {
71+
spark
72+
.range(probeRows)
73+
.selectExpr("id AS k", "id % 100 AS v")
74+
.write
75+
.parquet(s"${dir.getAbsolutePath}/probe")
76+
spark
77+
.range(buildRows)
78+
.selectExpr("id * 1000 AS lo", "id * 1000 + 500 AS hi")
79+
.write
80+
.parquet(s"${dir.getAbsolutePath}/build")
81+
82+
spark.read.parquet(s"${dir.getAbsolutePath}/probe").createOrReplaceTempView("probe")
83+
spark.read.parquet(s"${dir.getAbsolutePath}/build").createOrReplaceTempView("build")
84+
85+
runBenchmark("BroadcastNestedLoopJoin - range") {
86+
runExpressionBenchmark(
87+
"range join (BETWEEN)",
88+
probeRows,
89+
"SELECT count(*) FROM probe p " +
90+
"JOIN build b ON p.k BETWEEN b.lo AND b.hi",
91+
cometConfigs)
92+
}
93+
94+
runBenchmark("BroadcastNestedLoopJoin - inequality") {
95+
runExpressionBenchmark(
96+
"inequality join (>)",
97+
probeRows,
98+
"SELECT count(*) FROM probe p " +
99+
"JOIN build b ON p.k > b.lo",
100+
cometConfigs)
101+
}
102+
103+
runBenchmark("BroadcastNestedLoopJoin - left outer with non-equi") {
104+
runExpressionBenchmark(
105+
"left outer non-equi",
106+
probeRows,
107+
"SELECT count(*) FROM probe p " +
108+
"LEFT OUTER JOIN build b ON p.k BETWEEN b.lo AND b.hi",
109+
cometConfigs)
110+
}
111+
}
112+
}
113+
}
114+
}

0 commit comments

Comments
 (0)