Skip to content

Commit 8b5f172

Browse files
committed
test: link SPARK-35442 IgnoreComet to #4412 and unignore SPARK-34980/52921
For the SPARK-35442 aggregate and union tests in AdaptiveQueryExecSuite, change the IgnoreComet reason to a link to the new follow-up issue documenting why CometHashAggregateExec is not recognized by AQEPropagateEmptyRelation. For SPARK-34980 (and SPARK-52921 in 4.1.1), drop the IgnoreComet and teach checkResultPartition's union collect to also count CometUnionExec, since Comet converts the union to its own operator when both branches are Comet-compatible. Applies to dev/diffs/{3.4.3,3.5.8,4.0.2,4.1.1}.diff.
1 parent 8f0ca4e commit 8b5f172

4 files changed

Lines changed: 52 additions & 62 deletions

File tree

dev/diffs/3.4.3.diff

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,7 +1498,7 @@ index ac710c32296..2854b433dd3 100644
14981498

14991499
import testImplicits._
15001500
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
1501-
index 593bd7bb4ba..842dcf5577d 100644
1501+
index 593bd7bb4ba..7f68e3bd8d3 100644
15021502
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
15031503
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
15041504
@@ -26,9 +26,11 @@ import org.scalatest.time.SpanSugar._
@@ -1772,7 +1772,7 @@ index 593bd7bb4ba..842dcf5577d 100644
17721772

17731773
- test("SPARK-35442: Support propagate empty relation through aggregate") {
17741774
+ test("SPARK-35442: Support propagate empty relation through aggregate",
1775-
+ IgnoreComet("Comet aggregate is not matched by PropagateEmptyRelationAfterAQE")) {
1775+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) {
17761776
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
17771777
val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult(
17781778
"SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key")
@@ -1782,7 +1782,7 @@ index 593bd7bb4ba..842dcf5577d 100644
17821782

17831783
- test("SPARK-35442: Support propagate empty relation through union") {
17841784
+ test("SPARK-35442: Support propagate empty relation through union",
1785-
+ IgnoreComet("Comet aggregate is not matched by PropagateEmptyRelationAfterAQE")) {
1785+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) {
17861786
def checkNumUnion(plan: SparkPlan, numUnion: Int): Unit = {
17871787
assert(
17881788
collect(plan) {
@@ -1825,16 +1825,14 @@ index 593bd7bb4ba..842dcf5577d 100644
18251825
}.size == 1)
18261826
checkAnswer(ds, testData)
18271827
}
1828-
@@ -1892,7 +1930,8 @@ class AdaptiveQueryExecSuite
1829-
}
1830-
}
1831-
1832-
- test("SPARK-34980: Support coalesce partition through union") {
1833-
+ test("SPARK-34980: Support coalesce partition through union",
1834-
+ IgnoreComet("Comet changes the plan structure asserted by this test")) {
1835-
def checkResultPartition(
1836-
df: Dataset[Row],
1837-
numUnion: Int,
1828+
@@ -1901,6 +1939,7 @@ class AdaptiveQueryExecSuite
1829+
df.collect()
1830+
assert(collect(df.queryExecution.executedPlan) {
1831+
case u: UnionExec => u
1832+
+ case u: CometUnionExec => u.originalPlan.asInstanceOf[UnionExec]
1833+
}.size == numUnion)
1834+
assert(collect(df.queryExecution.executedPlan) {
1835+
case r: AQEShuffleReadExec => r
18381836
@@ -2028,7 +2067,8 @@ class AdaptiveQueryExecSuite
18391837
}
18401838
}

dev/diffs/3.5.8.diff

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1463,7 +1463,7 @@ index 5a413c77754..207b66e1d7b 100644
14631463

14641464
import testImplicits._
14651465
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
1466-
index 2f8e401e743..f6323db7b2c 100644
1466+
index 2f8e401e743..f408405807b 100644
14671467
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
14681468
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
14691469
@@ -27,9 +27,11 @@ import org.scalatest.time.SpanSugar._
@@ -1744,7 +1744,7 @@ index 2f8e401e743..f6323db7b2c 100644
17441744

17451745
- test("SPARK-35442: Support propagate empty relation through aggregate") {
17461746
+ test("SPARK-35442: Support propagate empty relation through aggregate",
1747-
+ IgnoreComet("Comet aggregate is not matched by PropagateEmptyRelationAfterAQE")) {
1747+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) {
17481748
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
17491749
val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult(
17501750
"SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key")
@@ -1754,7 +1754,7 @@ index 2f8e401e743..f6323db7b2c 100644
17541754

17551755
- test("SPARK-35442: Support propagate empty relation through union") {
17561756
+ test("SPARK-35442: Support propagate empty relation through union",
1757-
+ IgnoreComet("Comet aggregate is not matched by PropagateEmptyRelationAfterAQE")) {
1757+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) {
17581758
def checkNumUnion(plan: SparkPlan, numUnion: Int): Unit = {
17591759
assert(
17601760
collect(plan) {
@@ -1797,16 +1797,14 @@ index 2f8e401e743..f6323db7b2c 100644
17971797
}.size == 1)
17981798
checkAnswer(ds, testData)
17991799
}
1800-
@@ -1918,7 +1957,8 @@ class AdaptiveQueryExecSuite
1801-
}
1802-
}
1803-
1804-
- test("SPARK-34980: Support coalesce partition through union") {
1805-
+ test("SPARK-34980: Support coalesce partition through union",
1806-
+ IgnoreComet("Comet changes the plan structure asserted by this test")) {
1807-
def checkResultPartition(
1808-
df: Dataset[Row],
1809-
numUnion: Int,
1800+
@@ -1927,6 +1966,7 @@ class AdaptiveQueryExecSuite
1801+
df.collect()
1802+
assert(collect(df.queryExecution.executedPlan) {
1803+
case u: UnionExec => u
1804+
+ case u: CometUnionExec => u.originalPlan.asInstanceOf[UnionExec]
1805+
}.size == numUnion)
1806+
assert(collect(df.queryExecution.executedPlan) {
1807+
case r: AQEShuffleReadExec => r
18101808
@@ -2054,7 +2094,8 @@ class AdaptiveQueryExecSuite
18111809
}
18121810
}

dev/diffs/4.0.2.diff

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2047,7 +2047,7 @@ index a3cfdc5a240..3793b6191bf 100644
20472047
})
20482048
checkAnswer(distinctWithId, Seq(Row(1, 0), Row(1, 0)))
20492049
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
2050-
index 272be70f9fe..5ea9540ddf8 100644
2050+
index 272be70f9fe..8cea57eef5c 100644
20512051
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
20522052
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
20532053
@@ -28,12 +28,14 @@ import org.apache.spark.SparkException
@@ -2331,7 +2331,7 @@ index 272be70f9fe..5ea9540ddf8 100644
23312331

23322332
- test("SPARK-35442: Support propagate empty relation through aggregate") {
23332333
+ test("SPARK-35442: Support propagate empty relation through aggregate",
2334-
+ IgnoreComet("Comet aggregate is not matched by PropagateEmptyRelationAfterAQE")) {
2334+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) {
23352335
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
23362336
val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult(
23372337
"SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key")
@@ -2341,7 +2341,7 @@ index 272be70f9fe..5ea9540ddf8 100644
23412341

23422342
- test("SPARK-35442: Support propagate empty relation through union") {
23432343
+ test("SPARK-35442: Support propagate empty relation through union",
2344-
+ IgnoreComet("Comet aggregate is not matched by PropagateEmptyRelationAfterAQE")) {
2344+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) {
23452345
def checkNumUnion(plan: SparkPlan, numUnion: Int): Unit = {
23462346
assert(
23472347
collect(plan) {
@@ -2384,16 +2384,14 @@ index 272be70f9fe..5ea9540ddf8 100644
23842384
}.size == 1)
23852385
checkAnswer(ds, testData)
23862386
}
2387-
@@ -2014,7 +2053,8 @@ class AdaptiveQueryExecSuite
2388-
}
2389-
}
2390-
2391-
- test("SPARK-34980: Support coalesce partition through union") {
2392-
+ test("SPARK-34980: Support coalesce partition through union",
2393-
+ IgnoreComet("Comet changes the plan structure asserted by this test")) {
2394-
def checkResultPartition(
2395-
df: Dataset[Row],
2396-
numUnion: Int,
2387+
@@ -2023,6 +2062,7 @@ class AdaptiveQueryExecSuite
2388+
df.collect()
2389+
assert(collect(df.queryExecution.executedPlan) {
2390+
case u: UnionExec => u
2391+
+ case u: CometUnionExec => u.originalPlan.asInstanceOf[UnionExec]
2392+
}.size == numUnion)
2393+
assert(collect(df.queryExecution.executedPlan) {
2394+
case r: AQEShuffleReadExec => r
23972395
@@ -2150,7 +2190,8 @@ class AdaptiveQueryExecSuite
23982396
}
23992397
}

dev/diffs/4.1.1.diff

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2164,7 +2164,7 @@ index a3cfdc5a240..3793b6191bf 100644
21642164
})
21652165
checkAnswer(distinctWithId, Seq(Row(1, 0), Row(1, 0)))
21662166
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
2167-
index 3e7d26f74bd..a6c0ddfba2d 100644
2167+
index 3e7d26f74bd..6354c9a62e9 100644
21682168
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
21692169
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
21702170
@@ -27,12 +27,14 @@ import org.apache.spark.SparkException
@@ -2448,7 +2448,7 @@ index 3e7d26f74bd..a6c0ddfba2d 100644
24482448

24492449
- test("SPARK-35442: Support propagate empty relation through aggregate") {
24502450
+ test("SPARK-35442: Support propagate empty relation through aggregate",
2451-
+ IgnoreComet("Comet aggregate is not matched by PropagateEmptyRelationAfterAQE")) {
2451+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) {
24522452
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
24532453
val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult(
24542454
"SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key")
@@ -2458,7 +2458,7 @@ index 3e7d26f74bd..a6c0ddfba2d 100644
24582458

24592459
- test("SPARK-35442: Support propagate empty relation through union") {
24602460
+ test("SPARK-35442: Support propagate empty relation through union",
2461-
+ IgnoreComet("Comet aggregate is not matched by PropagateEmptyRelationAfterAQE")) {
2461+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) {
24622462
def checkNumUnion(plan: SparkPlan, numUnion: Int): Unit = {
24632463
assert(
24642464
collect(plan) {
@@ -2501,16 +2501,14 @@ index 3e7d26f74bd..a6c0ddfba2d 100644
25012501
}.size == 1)
25022502
checkAnswer(ds, testData)
25032503
}
2504-
@@ -2187,7 +2226,8 @@ class AdaptiveQueryExecSuite
2505-
}
2506-
}
2507-
2508-
- test("SPARK-34980: Support coalesce partition through union") {
2509-
+ test("SPARK-34980: Support coalesce partition through union",
2510-
+ IgnoreComet("Comet changes the plan structure asserted by this test")) {
2511-
def checkResultPartition(
2512-
df: Dataset[Row],
2513-
numUnion: Int,
2504+
@@ -2196,6 +2235,7 @@ class AdaptiveQueryExecSuite
2505+
df.collect()
2506+
assert(collect(df.queryExecution.executedPlan) {
2507+
case u: UnionExec => u
2508+
+ case u: CometUnionExec => u.originalPlan.asInstanceOf[UnionExec]
2509+
}.size == numUnion)
2510+
assert(collect(df.queryExecution.executedPlan) {
2511+
case r: AQEShuffleReadExec => r
25142512
@@ -2331,7 +2371,8 @@ class AdaptiveQueryExecSuite
25152513
}
25162514
}
@@ -2611,16 +2609,14 @@ index 3e7d26f74bd..a6c0ddfba2d 100644
26112609
}.isEmpty)
26122610
assert(collect(initialExecutedPlan) {
26132611
case i: InMemoryTableScanLike => i
2614-
@@ -3372,7 +3419,8 @@ class AdaptiveQueryExecSuite
2615-
}
2616-
}
2617-
2618-
- test("SPARK-52921: Specify outputPartitioning for UnionExec for same output partitoning") {
2619-
+ test("SPARK-52921: Specify outputPartitioning for UnionExec for same output partitoning",
2620-
+ IgnoreComet("Comet changes the plan structure asserted by this test")) {
2621-
def checkResultPartition(
2622-
df: Dataset[Row],
2623-
numUnion: Int,
2612+
@@ -3381,6 +3428,7 @@ class AdaptiveQueryExecSuite
2613+
df.collect()
2614+
assert(collect(df.queryExecution.executedPlan) {
2615+
case u: UnionExec => u
2616+
+ case u: CometUnionExec => u.originalPlan.asInstanceOf[UnionExec]
2617+
}.size == numUnion)
2618+
assert(collect(df.queryExecution.executedPlan) {
2619+
case r: AQEShuffleReadExec => r
26242620
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
26252621
index 47b935a2880..3e9b87f5c32 100644
26262622
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala

0 commit comments

Comments
 (0)