Skip to content

Commit 7b7d834

Browse files
committed
fix: update join metrics tests for GraceHashJoinExec
- HashJoin test now matches CometGraceHashJoinExec and checks GHJ metrics (no build_mem_used, adds spill_count) - BroadcastHashJoin test removes build_mem_used assertion since the native side does not report this metric - Remove dead CometHashJoinExec case class (createExec already produces CometGraceHashJoinExec)
1 parent ce05e97 commit 7b7d834

2 files changed

Lines changed: 3 additions & 60 deletions

File tree

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1740,61 +1740,6 @@ object CometHashJoinExec extends CometOperatorSerde[HashJoin] with CometHashJoin
17401740
}
17411741
}
17421742

1743-
case class CometHashJoinExec(
1744-
override val nativeOp: Operator,
1745-
override val originalPlan: SparkPlan,
1746-
override val output: Seq[Attribute],
1747-
override val outputOrdering: Seq[SortOrder],
1748-
leftKeys: Seq[Expression],
1749-
rightKeys: Seq[Expression],
1750-
joinType: JoinType,
1751-
condition: Option[Expression],
1752-
buildSide: BuildSide,
1753-
override val left: SparkPlan,
1754-
override val right: SparkPlan,
1755-
override val serializedPlanOpt: SerializedPlan)
1756-
extends CometBinaryExec {
1757-
1758-
override def outputPartitioning: Partitioning = joinType match {
1759-
case _: InnerLike =>
1760-
PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
1761-
case LeftOuter => left.outputPartitioning
1762-
case RightOuter => right.outputPartitioning
1763-
case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions)
1764-
case LeftExistence(_) => left.outputPartitioning
1765-
case x =>
1766-
throw new IllegalArgumentException(s"ShuffledJoin should not take $x as the JoinType")
1767-
}
1768-
1769-
override def withNewChildrenInternal(newLeft: SparkPlan, newRight: SparkPlan): SparkPlan =
1770-
this.copy(left = newLeft, right = newRight)
1771-
1772-
override def stringArgs: Iterator[Any] =
1773-
Iterator(leftKeys, rightKeys, joinType, buildSide, condition, left, right)
1774-
1775-
override def equals(obj: Any): Boolean = {
1776-
obj match {
1777-
case other: CometHashJoinExec =>
1778-
this.output == other.output &&
1779-
this.leftKeys == other.leftKeys &&
1780-
this.rightKeys == other.rightKeys &&
1781-
this.condition == other.condition &&
1782-
this.buildSide == other.buildSide &&
1783-
this.left == other.left &&
1784-
this.right == other.right &&
1785-
this.serializedPlanOpt == other.serializedPlanOpt
1786-
case _ =>
1787-
false
1788-
}
1789-
}
1790-
1791-
override def hashCode(): Int =
1792-
Objects.hashCode(output, leftKeys, rightKeys, condition, buildSide, left, right)
1793-
1794-
override lazy val metrics: Map[String, SQLMetric] =
1795-
CometMetricNode.hashJoinMetrics(sparkContext)
1796-
}
1797-
17981743
case class CometGraceHashJoinExec(
17991744
override val nativeOp: Operator,
18001745
override val originalPlan: SparkPlan,

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -692,16 +692,14 @@ class CometExecSuite extends CometTestBase {
692692
df.collect()
693693

694694
val metrics = find(df.queryExecution.executedPlan) {
695-
case _: CometHashJoinExec => true
695+
case _: CometGraceHashJoinExec => true
696696
case _ => false
697697
}.map(_.metrics).get
698698

699699
assert(metrics.contains("build_time"))
700700
assert(metrics("build_time").value > 1L)
701701
assert(metrics.contains("build_input_batches"))
702702
assert(metrics("build_input_batches").value == 5L)
703-
assert(metrics.contains("build_mem_used"))
704-
assert(metrics("build_mem_used").value > 1L)
705703
assert(metrics.contains("build_input_rows"))
706704
assert(metrics("build_input_rows").value == 5L)
707705
assert(metrics.contains("input_batches"))
@@ -714,6 +712,8 @@ class CometExecSuite extends CometTestBase {
714712
assert(metrics("output_rows").value == 5L)
715713
assert(metrics.contains("join_time"))
716714
assert(metrics("join_time").value > 1L)
715+
assert(metrics.contains("spill_count"))
716+
assert(metrics("spill_count").value == 0)
717717
}
718718
}
719719
}
@@ -733,8 +733,6 @@ class CometExecSuite extends CometTestBase {
733733
assert(metrics("build_time").value > 1L)
734734
assert(metrics.contains("build_input_batches"))
735735
assert(metrics("build_input_batches").value == 25L)
736-
assert(metrics.contains("build_mem_used"))
737-
assert(metrics("build_mem_used").value > 1L)
738736
assert(metrics.contains("build_input_rows"))
739737
assert(metrics("build_input_rows").value == 25L)
740738
assert(metrics.contains("input_batches"))

0 commit comments

Comments
 (0)