Skip to content

Commit 0520239

Browse files
authored
Merge branch 'main' into spark-4.0-support
2 parents da66599 + f85f697 commit 0520239

50 files changed

Lines changed: 2841 additions & 958 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/iceberg_spark_test.yml

Lines changed: 0 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -102,138 +102,6 @@ jobs:
102102
path: native/target/ci/libcomet.so
103103
retention-days: 1
104104

105-
iceberg-spark:
106-
needs: build-native
107-
if: contains(github.event.pull_request.title, '[iceberg]')
108-
strategy:
109-
matrix:
110-
os: [ubuntu-24.04]
111-
java-version: [11, 17]
112-
iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: '1.9.1'}, {short: '1.10', full: '1.10.0'}]
113-
spark-version: [{short: '3.5', full: '3.5.8'}]
114-
scala-version: ['2.13']
115-
fail-fast: false
116-
name: iceberg-spark/${{ matrix.os }}/iceberg-${{ matrix.iceberg-version.full }}/spark-${{ matrix.spark-version.full }}/scala-${{ matrix.scala-version }}/java-${{ matrix.java-version }}
117-
runs-on: ${{ matrix.os }}
118-
container:
119-
image: amd64/rust
120-
env:
121-
SPARK_LOCAL_IP: localhost
122-
steps:
123-
- uses: actions/checkout@v6
124-
- name: Setup Rust & Java toolchain
125-
uses: ./.github/actions/setup-builder
126-
with:
127-
rust-version: ${{env.RUST_VERSION}}
128-
jdk-version: ${{ matrix.java-version }}
129-
- name: Download native library
130-
uses: actions/download-artifact@v8
131-
with:
132-
name: native-lib-iceberg
133-
path: native/target/release/
134-
- name: Build Comet
135-
run: |
136-
./mvnw install -Prelease -DskipTests -Pspark-${{ matrix.spark-version.short }} -Pscala-${{ matrix.scala-version }}
137-
- name: Setup Iceberg
138-
uses: ./.github/actions/setup-iceberg-builder
139-
with:
140-
iceberg-version: ${{ matrix.iceberg-version.full }}
141-
- name: Run Iceberg Spark tests
142-
run: |
143-
cd apache-iceberg
144-
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
145-
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
146-
:iceberg-spark:iceberg-spark-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:test \
147-
-Pquick=true -x javadoc
148-
149-
iceberg-spark-extensions:
150-
needs: build-native
151-
if: contains(github.event.pull_request.title, '[iceberg]')
152-
strategy:
153-
matrix:
154-
os: [ubuntu-24.04]
155-
java-version: [11, 17]
156-
iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: '1.9.1'}, {short: '1.10', full: '1.10.0'}]
157-
spark-version: [{short: '3.5', full: '3.5.8'}]
158-
scala-version: ['2.13']
159-
fail-fast: false
160-
name: iceberg-spark-extensions/${{ matrix.os }}/iceberg-${{ matrix.iceberg-version.full }}/spark-${{ matrix.spark-version.full }}/scala-${{ matrix.scala-version }}/java-${{ matrix.java-version }}
161-
runs-on: ${{ matrix.os }}
162-
container:
163-
image: amd64/rust
164-
env:
165-
SPARK_LOCAL_IP: localhost
166-
steps:
167-
- uses: actions/checkout@v6
168-
- name: Setup Rust & Java toolchain
169-
uses: ./.github/actions/setup-builder
170-
with:
171-
rust-version: ${{env.RUST_VERSION}}
172-
jdk-version: ${{ matrix.java-version }}
173-
- name: Download native library
174-
uses: actions/download-artifact@v8
175-
with:
176-
name: native-lib-iceberg
177-
path: native/target/release/
178-
- name: Build Comet
179-
run: |
180-
./mvnw install -Prelease -DskipTests -Pspark-${{ matrix.spark-version.short }} -Pscala-${{ matrix.scala-version }}
181-
- name: Setup Iceberg
182-
uses: ./.github/actions/setup-iceberg-builder
183-
with:
184-
iceberg-version: ${{ matrix.iceberg-version.full }}
185-
- name: Run Iceberg Spark extensions tests
186-
run: |
187-
cd apache-iceberg
188-
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
189-
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
190-
:iceberg-spark:iceberg-spark-extensions-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:test \
191-
-Pquick=true -x javadoc
192-
193-
iceberg-spark-runtime:
194-
needs: build-native
195-
if: contains(github.event.pull_request.title, '[iceberg]')
196-
strategy:
197-
matrix:
198-
os: [ubuntu-24.04]
199-
java-version: [11, 17]
200-
iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: '1.9.1'}, {short: '1.10', full: '1.10.0'}]
201-
spark-version: [{short: '3.5', full: '3.5.8'}]
202-
scala-version: ['2.13']
203-
fail-fast: false
204-
name: iceberg-spark-runtime/${{ matrix.os }}/iceberg-${{ matrix.iceberg-version.full }}/spark-${{ matrix.spark-version.full }}/scala-${{ matrix.scala-version }}/java-${{ matrix.java-version }}
205-
runs-on: ${{ matrix.os }}
206-
container:
207-
image: amd64/rust
208-
env:
209-
SPARK_LOCAL_IP: localhost
210-
steps:
211-
- uses: actions/checkout@v6
212-
- name: Setup Rust & Java toolchain
213-
uses: ./.github/actions/setup-builder
214-
with:
215-
rust-version: ${{env.RUST_VERSION}}
216-
jdk-version: ${{ matrix.java-version }}
217-
- name: Download native library
218-
uses: actions/download-artifact@v8
219-
with:
220-
name: native-lib-iceberg
221-
path: native/target/release/
222-
- name: Build Comet
223-
run: |
224-
./mvnw install -Prelease -DskipTests -Pspark-${{ matrix.spark-version.short }} -Pscala-${{ matrix.scala-version }}
225-
- name: Setup Iceberg
226-
uses: ./.github/actions/setup-iceberg-builder
227-
with:
228-
iceberg-version: ${{ matrix.iceberg-version.full }}
229-
- name: Run Iceberg Spark runtime tests
230-
run: |
231-
cd apache-iceberg
232-
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
233-
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \
234-
:iceberg-spark:iceberg-spark-runtime-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:integrationTest \
235-
-Pquick=true -x javadoc
236-
237105
iceberg-spark-rust:
238106
needs: build-native
239107
if: contains(github.event.pull_request.title, '[iceberg]')

.github/workflows/pr_rat_check.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ jobs:
3737
name: RAT License Check
3838
runs-on: ubuntu-slim
3939
steps:
40-
- uses: actions/checkout@v4
40+
- uses: actions/checkout@v6
4141
- name: Set up Java
42-
uses: actions/setup-java@v4
42+
uses: actions/setup-java@v5
4343
with:
4444
distribution: temurin
4545
java-version: 11

common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala

Lines changed: 108 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ import java.nio.channels.Channels
2626
import scala.jdk.CollectionConverters._
2727

2828
import org.apache.arrow.c.CDataDictionaryProvider
29-
import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, NullVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot}
29+
import org.apache.arrow.vector._
3030
import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector}
3131
import org.apache.arrow.vector.dictionary.DictionaryProvider
32-
import org.apache.arrow.vector.ipc.ArrowStreamWriter
32+
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
3333
import org.apache.arrow.vector.types._
3434
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
35+
import org.apache.arrow.vector.util.VectorSchemaRootAppender
3536
import org.apache.spark.{SparkEnv, SparkException}
37+
import org.apache.spark.internal.Logging
3638
import org.apache.spark.io.CompressionCodec
3739
import org.apache.spark.sql.comet.execution.arrow.ArrowReaderIterator
3840
import org.apache.spark.sql.types._
@@ -43,7 +45,7 @@ import org.apache.comet.Constants.COMET_CONF_DIR_ENV
4345
import org.apache.comet.shims.CometTypeShim
4446
import org.apache.comet.vector.CometVector
4547

46-
object Utils extends CometTypeShim {
48+
object Utils extends CometTypeShim with Logging {
4749
def getConfPath(confFileName: String): String = {
4850
sys.env
4951
.get(COMET_CONF_DIR_ENV)
@@ -232,6 +234,7 @@ object Utils extends CometTypeShim {
232234

233235
/**
234236
* Decodes the byte arrays back to ColumnarBatchs and put them into buffer.
237+
*
235238
* @param bytes
236239
* the serialized batches
237240
* @param source
@@ -252,6 +255,108 @@ object Utils extends CometTypeShim {
252255
new ArrowReaderIterator(Channels.newChannel(ins), source)
253256
}
254257

258+
/**
259+
* Coalesces many small Arrow IPC batches into a single batch for broadcasting.
260+
*
261+
* Why this is necessary: The broadcast exchange collects shuffle output by calling
262+
* getByteArrayRdd, which serializes each ColumnarBatch independently into its own
263+
* ChunkedByteBuffer. The shuffle reader (CometBlockStoreShuffleReader) produces one
264+
* ColumnarBatch per shuffle block, and there is one block per writer task per output partition.
265+
* So with W writer tasks and P output partitions, the broadcast collects up to W * P tiny
266+
* batches. For example, with 400 writer tasks and 500 partitions, 1M rows would arrive as ~200K
267+
* batches of ~5 rows each.
268+
*
269+
* Without coalescing, every consumer task in the broadcast join would independently deserialize
270+
* all of these tiny Arrow IPC streams, paying per-stream overhead (schema parsing, buffer
271+
* allocation) for each one. With coalescing, we decode and append all batches into one
272+
* VectorSchemaRoot on the driver, then re-serialize once. Each consumer task then deserializes
273+
* a single Arrow IPC stream.
274+
*/
275+
def coalesceBroadcastBatches(
276+
input: Iterator[ChunkedByteBuffer]): (Array[ChunkedByteBuffer], Long, Long) = {
277+
val buffers = input.filterNot(_.size == 0).toArray
278+
if (buffers.isEmpty) {
279+
return (Array.empty, 0L, 0L)
280+
}
281+
282+
val allocator = org.apache.comet.CometArrowAllocator
283+
.newChildAllocator("broadcast-coalesce", 0, Long.MaxValue)
284+
try {
285+
var targetRoot: VectorSchemaRoot = null
286+
var totalRows = 0L
287+
var batchCount = 0
288+
289+
val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
290+
try {
291+
for (bytes <- buffers) {
292+
val compressedInputStream =
293+
new DataInputStream(codec.compressedInputStream(bytes.toInputStream()))
294+
val reader =
295+
new ArrowStreamReader(Channels.newChannel(compressedInputStream), allocator)
296+
try {
297+
// Comet decodes dictionaries during execution, so this shouldn't happen.
298+
// If it does, fall back to the original uncoalesced buffers because each
299+
// partition can have a different dictionary, and appending index vectors
300+
// would silently mix indices from incompatible dictionaries.
301+
if (!reader.getDictionaryVectors.isEmpty) {
302+
logWarning(
303+
"Unexpected dictionary-encoded column during BroadcastExchange coalescing; " +
304+
"skipping coalesce")
305+
reader.close()
306+
if (targetRoot != null) {
307+
targetRoot.close()
308+
targetRoot = null
309+
}
310+
return (buffers, 0L, 0L)
311+
}
312+
while (reader.loadNextBatch()) {
313+
val sourceRoot = reader.getVectorSchemaRoot
314+
if (targetRoot == null) {
315+
targetRoot = VectorSchemaRoot.create(sourceRoot.getSchema, allocator)
316+
targetRoot.allocateNew()
317+
}
318+
VectorSchemaRootAppender.append(targetRoot, sourceRoot)
319+
totalRows += sourceRoot.getRowCount
320+
batchCount += 1
321+
}
322+
} finally {
323+
reader.close()
324+
}
325+
}
326+
327+
if (targetRoot == null) {
328+
return (Array.empty, 0L, 0L)
329+
}
330+
331+
assert(
332+
targetRoot.getRowCount.toLong == totalRows,
333+
s"Row count mismatch after coalesce: ${targetRoot.getRowCount} != $totalRows")
334+
335+
logInfo(s"Coalesced $batchCount broadcast batches into 1 ($totalRows rows)")
336+
337+
val outputStream = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate)
338+
val compressedOutputStream =
339+
new DataOutputStream(codec.compressedOutputStream(outputStream))
340+
val writer =
341+
new ArrowStreamWriter(targetRoot, null, Channels.newChannel(compressedOutputStream))
342+
try {
343+
writer.start()
344+
writer.writeBatch()
345+
} finally {
346+
writer.close()
347+
}
348+
349+
(Array(outputStream.toChunkedByteBuffer), batchCount.toLong, totalRows)
350+
} finally {
351+
if (targetRoot != null) {
352+
targetRoot.close()
353+
}
354+
}
355+
} finally {
356+
allocator.close()
357+
}
358+
}
359+
255360
def getBatchFieldVectors(
256361
batch: ColumnarBatch): (Seq[FieldVector], Option[DictionaryProvider]) = {
257362
var provider: Option[DictionaryProvider] = None

0 commit comments

Comments
 (0)