Skip to content

Commit b21275f

Browse files
committed
fix: resolve CI failures for grace hash join PR
- Rename CometGraceHashJoinExec back to CometHashJoinExec to avoid breaking spark-sql test diffs that reference CometHashJoinExec - Add output_batches and join_time metrics to GraceHashJoinMetrics (both fast and slow paths) - Fix clippy type_complexity warning on create_fast_path_stream
1 parent c08c65f commit b21275f

5 files changed

Lines changed: 23 additions & 9 deletions

File tree

docs/source/contributor-guide/grace-hash-join-design.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ The configured threshold is the total budget across all concurrent tasks on the
5252
SortMergeJoinExec
5353
-> RewriteJoin converts to ShuffledHashJoinExec (removes input sorts)
5454
-> CometExecRule wraps as CometHashJoinExec
55-
-> CometHashJoinExec.createExec() creates CometGraceHashJoinExec
55+
-> CometHashJoinExec.createExec() creates CometHashJoinExec
5656
-> Serialized to protobuf via JNI
5757
-> PhysicalPlanner (Rust) creates GraceHashJoinExec
5858
```

native/core/src/execution/operators/grace_hash_join.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,10 @@ struct GraceHashJoinMetrics {
429429
input_rows: Count,
430430
/// Number of probe-side input batches
431431
input_batches: Count,
432+
/// Number of output batches
433+
output_batches: Count,
434+
/// Time spent in per-partition joins
435+
join_time: Time,
432436
}
433437

434438
impl GraceHashJoinMetrics {
@@ -444,6 +448,8 @@ impl GraceHashJoinMetrics {
444448
.counter("build_input_batches", partition),
445449
input_rows: MetricBuilder::new(metrics).counter("input_rows", partition),
446450
input_batches: MetricBuilder::new(metrics).counter("input_batches", partition),
451+
output_batches: MetricBuilder::new(metrics).counter("output_batches", partition),
452+
join_time: MetricBuilder::new(metrics).subset_time("join_time", partition),
447453
}
448454
}
449455
}
@@ -821,8 +827,12 @@ async fn execute_grace_hash_join(
821827
)?;
822828

823829
let output_metrics = metrics.baseline.clone();
830+
let output_batch_count = metrics.output_batches.clone();
831+
let join_time = metrics.join_time.clone();
824832
let result_stream = stream.inspect_ok(move |batch| {
833+
let _timer = join_time.timer();
825834
output_metrics.record_output(batch.num_rows());
835+
output_batch_count.add(1);
826836
});
827837

828838
return Ok(result_stream.boxed());
@@ -1072,7 +1082,7 @@ fn partition_from_buffer(
10721082
}
10731083

10741084
/// Create the fast-path HashJoinExec stream (no partitioning, no spilling).
1075-
#[allow(clippy::too_many_arguments)]
1085+
#[allow(clippy::too_many_arguments, clippy::type_complexity)]
10761086
fn create_fast_path_stream(
10771087
build_data: Vec<RecordBatch>,
10781088
probe_stream: SendableRecordBatchStream,
@@ -1303,6 +1313,8 @@ async fn execute_slow_path(
13031313
drop(tx);
13041314

13051315
let output_metrics = metrics.baseline.clone();
1316+
let output_batch_count = metrics.output_batches.clone();
1317+
let join_time = metrics.join_time.clone();
13061318
let output_row_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
13071319
let counter = Arc::clone(&output_row_count);
13081320
let jt = join_type;
@@ -1311,7 +1323,9 @@ async fn execute_slow_path(
13111323
rx.recv().await.map(|batch| (batch, rx))
13121324
})
13131325
.inspect_ok(move |batch| {
1326+
let _timer = join_time.timer();
13141327
output_metrics.record_output(batch.num_rows());
1328+
output_batch_count.add(1);
13151329
let prev = counter.fetch_add(batch.num_rows(), std::sync::atomic::Ordering::Relaxed);
13161330
let new_total = prev + batch.num_rows();
13171331
// Log every ~1M rows to detect exploding joins

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1724,7 +1724,7 @@ object CometHashJoinExec extends CometOperatorSerde[HashJoin] with CometHashJoin
17241724
doConvert(join, builder, childOp: _*)
17251725

17261726
override def createExec(nativeOp: Operator, op: HashJoin): CometNativeExec = {
1727-
CometGraceHashJoinExec(
1727+
CometHashJoinExec(
17281728
nativeOp,
17291729
op,
17301730
op.output,
@@ -1740,7 +1740,7 @@ object CometHashJoinExec extends CometOperatorSerde[HashJoin] with CometHashJoin
17401740
}
17411741
}
17421742

1743-
case class CometGraceHashJoinExec(
1743+
case class CometHashJoinExec(
17441744
override val nativeOp: Operator,
17451745
override val originalPlan: SparkPlan,
17461746
override val output: Seq[Attribute],
@@ -1774,7 +1774,7 @@ case class CometGraceHashJoinExec(
17741774

17751775
override def equals(obj: Any): Boolean = {
17761776
obj match {
1777-
case other: CometGraceHashJoinExec =>
1777+
case other: CometHashJoinExec =>
17781778
this.output == other.output &&
17791779
this.leftKeys == other.leftKeys &&
17801780
this.rightKeys == other.rightKeys &&

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,7 @@ class CometExecSuite extends CometTestBase {
692692
df.collect()
693693

694694
val metrics = find(df.queryExecution.executedPlan) {
695-
case _: CometGraceHashJoinExec => true
695+
case _: CometHashJoinExec => true
696696
case _ => false
697697
}.map(_.metrics).get
698698

spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.scalatest.Tag
2727
import org.apache.spark.sql.CometTestBase
2828
import org.apache.spark.sql.catalyst.TableIdentifier
2929
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
30-
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec, CometGraceHashJoinExec}
30+
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec, CometHashJoinExec}
3131
import org.apache.spark.sql.internal.SQLConf
3232
import org.apache.spark.sql.types.{DataTypes, Decimal, StructField, StructType}
3333

@@ -592,13 +592,13 @@ class CometJoinSuite extends CometTestBase {
592592
}
593593
}
594594

595-
test("Grace HashJoin - plan shows CometGraceHashJoinExec") {
595+
test("Grace HashJoin - plan shows CometHashJoinExec") {
596596
withSQLConf(graceHashJoinConf: _*) {
597597
withParquetTable((0 until 50).map(i => (i, i % 5)), "tbl_a") {
598598
withParquetTable((0 until 50).map(i => (i % 10, i + 2)), "tbl_b") {
599599
val df = sql(
600600
"SELECT /*+ SHUFFLE_HASH(tbl_a) */ * FROM tbl_a JOIN tbl_b ON tbl_a._2 = tbl_b._1")
601-
checkSparkAnswerAndOperator(df, Seq(classOf[CometGraceHashJoinExec]))
601+
checkSparkAnswerAndOperator(df, Seq(classOf[CometHashJoinExec]))
602602
}
603603
}
604604
}

0 commit comments

Comments
 (0)