Skip to content

Commit 12ded1d

Browse files
committed
Add metrics, spotless.
1 parent bfd3f6b commit 12ded1d

3 files changed

Lines changed: 32 additions & 11 deletions

File tree

common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import java.nio.channels.Channels
2626
import scala.jdk.CollectionConverters._
2727

2828
import org.apache.arrow.c.CDataDictionaryProvider
29-
import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, NullVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot}
29+
import org.apache.arrow.vector._
3030
import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector}
3131
import org.apache.arrow.vector.dictionary.DictionaryProvider
3232
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
@@ -234,6 +234,7 @@ object Utils extends CometTypeShim with Logging {
234234

235235
/**
236236
* Decodes the byte arrays back to ColumnarBatchs and put them into buffer.
237+
*
237238
* @param bytes
238239
* the serialized batches
239240
* @param source
@@ -264,10 +265,11 @@ object Utils extends CometTypeShim with Logging {
264265
* re-serialize once via ArrowStreamWriter. This is done on the driver (not per-task) so the
265266
* cost is paid once rather than once per consumer partition.
266267
*/
267-
def coalesceBroadcastBatches(input: Iterator[ChunkedByteBuffer]): Array[ChunkedByteBuffer] = {
268+
def coalesceBroadcastBatches(
269+
input: Iterator[ChunkedByteBuffer]): (Array[ChunkedByteBuffer], Long, Long) = {
268270
val buffers = input.filterNot(_.size == 0).toArray
269271
if (buffers.isEmpty) {
270-
return Array.empty
272+
return (Array.empty, 0L, 0L)
271273
}
272274

273275
val allocator = org.apache.comet.CometArrowAllocator
@@ -308,7 +310,7 @@ object Utils extends CometTypeShim with Logging {
308310
}
309311

310312
if (targetRoot == null) {
311-
return Array.empty
313+
return (Array.empty, 0L, 0L)
312314
}
313315

314316
assert(
@@ -320,7 +322,7 @@ object Utils extends CometTypeShim with Logging {
320322
val outCodec = CompressionCodec.createCodec(SparkEnv.get.conf)
321323
val cbbos = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate)
322324
val out = new DataOutputStream(outCodec.compressedOutputStream(cbbos))
323-
// null provider is safe here we assert no dictionary-encoded columns above
325+
// null provider is safe here because we assert no dictionary-encoded columns above
324326
val writer = new ArrowStreamWriter(targetRoot, null, Channels.newChannel(out))
325327
try {
326328
writer.start()
@@ -329,7 +331,7 @@ object Utils extends CometTypeShim with Logging {
329331
writer.close()
330332
}
331333

332-
Array(cbbos.toChunkedByteBuffer)
334+
(Array(cbbos.toChunkedByteBuffer), batchCount.toLong, totalRows)
333335
} finally {
334336
if (targetRoot != null) {
335337
targetRoot.close()

spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,13 @@ case class CometBroadcastExchangeExec(
7777
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
7878
"collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect"),
7979
"buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build"),
80-
"broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to broadcast"))
80+
"broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to broadcast"),
81+
"numCoalescedBatches" -> SQLMetrics.createMetric(
82+
sparkContext,
83+
"number of coalesced batches for broadcast"),
84+
"numCoalescedRows" -> SQLMetrics.createMetric(
85+
sparkContext,
86+
"number of coalesced rows for broadcast"))
8187

8288
override def doCanonicalize(): SparkPlan = {
8389
CometBroadcastExchangeExec(null, null, mode, child.canonicalized)
@@ -157,7 +163,9 @@ case class CometBroadcastExchangeExec(
157163

158164
// Coalesce many small per-partition buffers into a single buffer so each
159165
// consumer partition only deserializes one Arrow IPC stream.
160-
val batches = Utils.coalesceBroadcastBatches(input)
166+
val (batches, coalescedBatches, coalescedRows) = Utils.coalesceBroadcastBatches(input)
167+
longMetric("numCoalescedBatches") += coalescedBatches
168+
longMetric("numCoalescedRows") += coalescedRows
161169

162170
val dataSize = batches.map(_.size).sum
163171

spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -427,13 +427,24 @@ class CometJoinSuite extends CometTestBase {
427427
// Without coalescing, build_input_batches would be ~numPartitions per task,
428428
// totaling ~numPartitions * numPartitions across all tasks.
429429
// With coalescing, each task gets 1 batch, so total ≈ numPartitions.
430-
// scalastyle:off println
431-
println(s"Build-side metrics: batches=$buildBatches, rows=$buildRows")
432-
// scalastyle:on println
433430
assert(
434431
buildBatches <= numPartitions,
435432
s"Expected at most $numPartitions build batches (1 per task), got $buildBatches. " +
436433
"Broadcast batch coalescing may not be working.")
434+
435+
val broadcasts = collect(df2.queryExecution.executedPlan) {
436+
case b: CometBroadcastExchangeExec => b
437+
}
438+
assert(broadcasts.nonEmpty, "Expected CometBroadcastExchangeExec in plan")
439+
440+
val broadcast = broadcasts.head
441+
val coalescedBatches = broadcast.metrics("numCoalescedBatches").value
442+
val coalescedRows = broadcast.metrics("numCoalescedRows").value
443+
444+
assert(
445+
coalescedBatches >= numPartitions,
446+
s"Expected at least $numPartitions coalesced batches, got $coalescedBatches")
447+
assert(coalescedRows == 10000, s"Expected 10000 coalesced rows, got $coalescedRows")
437448
}
438449
}
439450
}

0 commit comments

Comments
 (0)