Skip to content

Commit 80f1176

Browse files
committed
Fix VectorSchemaRoot allocation.
1 parent 43d3920 commit 80f1176

1 file changed

Lines changed: 21 additions & 5 deletions

File tree

  • common/src/main/scala/org/apache/spark/sql/comet/util

common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,13 @@ object Utils extends CometTypeShim with Logging {
256256

257257
/**
258258
* Coalesces many small ChunkedByteBuffers (one per source partition) into a single
259-
* ChunkedByteBuffer containing one Arrow IPC stream with one record batch. This avoids each
260-
* consumer partition having to deserialize N separate streams.
259+
* ChunkedByteBuffer. Without coalescing, each consumer task in a broadcast hash join
260+
* deserializes N separate Arrow IPC streams (one per source partition), which dominates
261+
* build-side time when partition counts are high (e.g. 200+ partitions in TPC-H Q18).
262+
*
263+
* We decode and append all source batches into one VectorSchemaRoot on the driver,
264+
* then re-serialize once via ArrowStreamWriter. This is done on the driver (not per-task)
265+
* so the cost is paid once rather than once per consumer partition.
261266
*/
262267
def coalesceBroadcastBatches(input: Iterator[ChunkedByteBuffer]): Array[ChunkedByteBuffer] = {
263268
val buffers = input.filterNot(_.size == 0).toArray
@@ -280,10 +285,18 @@ object Utils extends CometTypeShim with Logging {
280285
val channel = Channels.newChannel(ins)
281286
val reader = new ArrowStreamReader(channel, allocator)
282287
try {
288+
// Dictionary-encoded columns would be unsafe to coalesce because each
289+
// partition can have a different dictionary, and appending index vectors
290+
// would silently mix indices from incompatible dictionaries.
291+
// DataFusion decodes dictionaries during execution, so this shouldn't happen.
292+
assert(
293+
reader.getDictionaryVectors.isEmpty,
294+
"Cannot coalesce dictionary-encoded broadcast batches")
283295
while (reader.loadNextBatch()) {
284296
val sourceRoot = reader.getVectorSchemaRoot
285297
if (targetRoot == null) {
286298
targetRoot = VectorSchemaRoot.create(sourceRoot.getSchema, allocator)
299+
targetRoot.allocateNew()
287300
}
288301
VectorSchemaRootAppender.append(targetRoot, sourceRoot)
289302
totalRows += sourceRoot.getRowCount
@@ -308,9 +321,12 @@ object Utils extends CometTypeShim with Logging {
308321
val cbbos = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate)
309322
val out = new DataOutputStream(outCodec.compressedOutputStream(cbbos))
310323
val writer = new ArrowStreamWriter(targetRoot, null, Channels.newChannel(out))
311-
writer.start()
312-
writer.writeBatch()
313-
writer.close()
324+
try {
325+
writer.start()
326+
writer.writeBatch()
327+
} finally {
328+
writer.close()
329+
}
314330

315331
Array(cbbos.toChunkedByteBuffer)
316332
} finally {

0 commit comments

Comments
 (0)