Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions .github/workflows/spark_sql_test_reusable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ jobs:
module:
- {name: "catalyst", args1: "catalyst/test", args2: ""}
# sql_core-* set HEAP_SIZE / METASPACE_SIZE so SparkBuild.scala caps
- {name: "sql_core-1", args1: "", args2: "sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest", heap: "4g", metaspace: "1g"}
- {name: "sql_core-2", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.ExtendedSQLTest", heap: "4g", metaspace: "1g"}
- {name: "sql_core-3", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.SlowSQLTest", heap: "4g", metaspace: "1g"}
- {name: "sql_core-1", args1: "", args2: "sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest", heap: "3g", metaspace: "1g"}
- {name: "sql_core-2", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.ExtendedSQLTest", heap: "3g", metaspace: "1g"}
- {name: "sql_core-3", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.SlowSQLTest", heap: "3g", metaspace: "1g"}
- {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
- {name: "sql_hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"}
- {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"}
Expand Down Expand Up @@ -223,9 +223,7 @@ jobs:
if [ "${{ inputs.spark-short }}" != "4.0" ] || [ "${{ inputs.java }}" != "21" ]; then
export SERIAL_SBT_TESTS=1
fi
# Per-row forked-test-JVM caps (read by Spark's SparkBuild.scala). Only
# exported when the matrix entry sets them; rows without these fields
# keep Spark's defaults (-Xmx4g, -XX:MaxMetaspaceSize=1300m).
# Per-row forked-test-JVM caps (read by Spark's SparkBuild.scala).
if [ -n "${{ matrix.module.heap }}" ]; then
export HEAP_SIZE="${{ matrix.module.heap }}"
fi
Expand All @@ -247,10 +245,23 @@ jobs:
# The build-jvm job pre-compiled Spark sources and Test classes, so
# SBT here only orchestrates `testOnly` — Zinc verifies "no changes"
# against the unpacked apache-spark/ tree and skips compilation. We
# cap SBT heap at 1536 MB (down from 3072 MB) so the freed RAM goes
# cap SBT heap so the freed RAM goes
# to the forked test JVM and OS/container overhead, fixing the
# cgroup-OOM SIGKILLs we saw on sql_core-* under 7 GB runners.
SBT_MEM: "1536"
SBT_MEM: "1024"
# G1GC + tuning for the SBT orchestrator JVM. -Xss4m replaces the
# launcher's -Xss64m default (no compile here, deep recursion not
# needed). UseStringDeduplication and MaxMetaspaceSize cap real
# and ceiling footprint. ExitOnOutOfMemoryError fails fast.
SBT_OPTS: >-
-Xss4m
-XX:+UseG1GC
-XX:+UseStringDeduplication
-XX:MaxMetaspaceSize=384m
-XX:G1HeapRegionSize=2m
-XX:InitiatingHeapOccupancyPercent=35
-XX:+ParallelRefProcEnabled
-XX:+ExitOnOutOfMemoryError
# Mirror Spark's own JDK 21 / 25 CI workaround. apache/spark's
# build_java21.yml and build_java25.yml set this same env var to
# process-isolate the V1/V2 Parquet and Orc source suites because
Expand Down
6 changes: 3 additions & 3 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index d3544881af1..1126f287096 100644
index d3544881af1..aae0ae3b27b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,8 @@
Expand Down Expand Up @@ -2977,7 +2977,7 @@ index dd55fcfe42c..d9a3f2df535 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..25b798d2c1c 100644
index ed2e309fa07..0658bfe9e12 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,20 @@ trait SharedSparkSessionBase
Expand All @@ -2996,7 +2996,7 @@ index ed2e309fa07..25b798d2c1c 100644
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+ .set("spark.comet.memoryOverhead", "2g")
+ }
conf.set(
StaticSQLConf.WAREHOUSE_PATH,
Expand Down
6 changes: 3 additions & 3 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index edd2ad57880..15a0947abf4 100644
index edd2ad57880..a47b7dec672 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,8 @@
Expand Down Expand Up @@ -2926,7 +2926,7 @@ index e937173a590..263934fbe7b 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..25b798d2c1c 100644
index ed2e309fa07..0658bfe9e12 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,20 @@ trait SharedSparkSessionBase
Expand All @@ -2945,7 +2945,7 @@ index ed2e309fa07..25b798d2c1c 100644
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+ .set("spark.comet.memoryOverhead", "2g")
+ }
conf.set(
StaticSQLConf.WAREHOUSE_PATH,
Expand Down
6 changes: 3 additions & 3 deletions dev/diffs/4.0.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ index 6c51bd4ff2e..e72ec1d26e2 100644
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
diff --git a/pom.xml b/pom.xml
index 252cfdf9073..64e899efe6b 100644
index 252cfdf9073..60cb9dcb7cf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,8 @@
Expand Down Expand Up @@ -3590,7 +3590,7 @@ index f0f3f94b811..b7d18771314 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index 245219c1756..b566f970ccd 100644
index 245219c1756..2213d438a0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -75,6 +75,21 @@ trait SharedSparkSessionBase
Expand All @@ -3609,7 +3609,7 @@ index 245219c1756..b566f970ccd 100644
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+ .set("spark.comet.memoryOverhead", "2g")
+
+ }
conf.set(
Expand Down
23 changes: 12 additions & 11 deletions dev/diffs/4.1.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ index 6df8bc85b51..dabb75e2b75 100644
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
diff --git a/pom.xml b/pom.xml
index dc201151999..3e278cfb34c 100644
index dc201151999..d5c08f11ded 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,8 @@
Expand Down Expand Up @@ -885,7 +885,7 @@ index 53e47f428c3..a55d8f0c161 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 885512d4d19..113ae17ad9f 100644
index 885512d4d19..09b1ccaed71 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand Down Expand Up @@ -1055,7 +1055,7 @@ index 885512d4d19..113ae17ad9f 100644
// No extra shuffle before aggregation
assert(collect(plan) { case _: ShuffleExchangeExec => true }.size === 0)
}
@@ -1501,7 +1523,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1501,7 +1526,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
// Have shuffle before aggregation
Expand All @@ -1065,7 +1065,8 @@ index 885512d4d19..113ae17ad9f 100644
}

def getJoinQuery(selectExpr: String, joinType: String): String = {
@@ -1530,9 +1556,15 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1529,10 +1555,16 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
"/*+ BROADCAST(right_t) */ k1 as k0"
}
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
- assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
Expand All @@ -1084,7 +1085,7 @@ index 885512d4d19..113ae17ad9f 100644
}

// Test output ordering is not preserved
@@ -1541,9 +1567,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1541,9 +1573,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
Expand All @@ -1099,7 +1100,7 @@ index 885512d4d19..113ae17ad9f 100644
}

// Test singe partition
@@ -1553,7 +1582,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1553,7 +1588,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
|FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
|""".stripMargin)
val plan = fullJoinDF.queryExecution.executedPlan
Expand All @@ -1109,7 +1110,7 @@ index 885512d4d19..113ae17ad9f 100644
checkAnswer(fullJoinDF, Row(100))
}
}
@@ -1626,6 +1656,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1626,6 +1662,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
Seq(semiJoinDF, antiJoinDF).foreach { df =>
assert(collect(df.queryExecution.executedPlan) {
case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true
Expand All @@ -1119,7 +1120,7 @@ index 885512d4d19..113ae17ad9f 100644
}.size == 1)
}
}
@@ -1670,14 +1703,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1670,14 +1709,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan

test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
Expand All @@ -1142,7 +1143,7 @@ index 885512d4d19..113ae17ad9f 100644
}
dupStreamSideColTest("SHUFFLE_HASH", check)
}
@@ -1813,7 +1852,8 @@ class ThreadLeakInSortMergeJoinSuite
@@ -1813,7 +1858,8 @@ class ThreadLeakInSortMergeJoinSuite
sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
}

Expand Down Expand Up @@ -3849,7 +3850,7 @@ index f0f3f94b811..b7d18771314 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index 720b13b812e..71b20c79a12 100644
index 720b13b812e..d08c2548ffa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -98,6 +98,21 @@ trait SharedSparkSessionBase
Expand All @@ -3868,7 +3869,7 @@ index 720b13b812e..71b20c79a12 100644
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+ .set("spark.comet.memoryOverhead", "2g")
+
+ }
conf.set(
Expand Down
Loading