Skip to content

Commit e6ef10b

Browse files
authored
build: add spark-4.1 Maven profile and shim sources (apache#4097)
1 parent fc29bc3 commit e6ef10b

37 files changed

Lines changed: 1898 additions & 9 deletions

.github/workflows/pr_build_linux.yml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,38 @@ jobs:
140140
run: |
141141
./dev/ci/check-working-tree-clean.sh
142142
143+
# Compile-only verification for Spark 4.1. Tests are intentionally skipped: the spark-4.1
144+
# profile is currently a build target only, and several runtime/test failures are tracked
145+
# in follow-up PRs. Excluded from lint-java because semanticdb-scalac_2.13.17 is not yet
146+
# published and the lint job activates -Psemanticdb.
147+
build-spark-4-1:
148+
needs: lint
149+
name: Build Spark 4.1, JDK 17
150+
runs-on: ubuntu-latest
151+
container:
152+
image: amd64/rust
153+
steps:
154+
- uses: actions/checkout@v6
155+
156+
- name: Setup Rust & Java toolchain
157+
uses: ./.github/actions/setup-builder
158+
with:
159+
rust-version: ${{ env.RUST_VERSION }}
160+
jdk-version: 17
161+
162+
- name: Cache Maven dependencies
163+
uses: actions/cache@v5
164+
with:
165+
path: |
166+
~/.m2/repository
167+
/root/.m2/repository
168+
key: ${{ runner.os }}-java-maven-${{ hashFiles('**/pom.xml') }}-spark-4.1-build
169+
restore-keys: |
170+
${{ runner.os }}-java-maven-
171+
172+
- name: Compile (skip tests)
173+
run: ./mvnw -B install -DskipTests -Dmaven.test.skip=true -Pspark-4.1
174+
143175
# Build native library once and share with all test jobs
144176
build-native:
145177
needs: lint
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.sql.execution.datasources.VariantMetadata
23+
import org.apache.spark.sql.types.{DataType, StringType, StructType}
24+
25+
trait CometTypeShim {
26+
// A `StringType` carries collation metadata in Spark 4.0. Only non-default (non-UTF8_BINARY)
27+
// collations have semantics Comet's byte-level hashing/sorting/equality cannot honor. The
28+
// default `StringType` object is `StringType(UTF8_BINARY_COLLATION_ID)`, so comparing
29+
// `collationId` against that instance's id picks out non-default collations without needing
30+
// `private[sql]` helpers on `StringType`.
31+
def isStringCollationType(dt: DataType): Boolean = dt match {
32+
case st: StringType => st.collationId != StringType.collationId
33+
case _ => false
34+
}
35+
36+
// Spark 4.0's `PushVariantIntoScan` rewrites `VariantType` columns into a `StructType` whose
37+
// fields each carry `__VARIANT_METADATA_KEY` metadata, then pushes `variant_get` paths down as
38+
// ordinary struct field accesses. Comet's native scans don't understand the on-disk Parquet
39+
// variant shredding layout, so reading such a struct natively returns nulls. Detect the marker
40+
// and force scan fallback.
41+
def isVariantStruct(s: StructType): Boolean = VariantMetadata.isVariantStruct(s)
42+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.paths.SparkPath
23+
import org.apache.spark.sql.catalyst.InternalRow
24+
import org.apache.spark.sql.execution.datasources.PartitionedFile
25+
26+
object ShimBatchReader {
27+
def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
28+
PartitionedFile(
29+
partitionValues,
30+
SparkPath.fromUrlString(file),
31+
-1, // -1 means we read the entire file
32+
-1,
33+
Array.empty[String],
34+
0,
35+
0)
36+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
trait ShimCometConf {
23+
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = true
24+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
23+
import org.apache.spark.sql.execution.datasources.parquet.ParquetRowIndexUtil
24+
import org.apache.spark.sql.types.StructType
25+
26+
object ShimFileFormat {
27+
// A name for a temporary column that holds row indexes computed by the file format reader
28+
// until they can be placed in the _metadata struct.
29+
val ROW_INDEX_TEMPORARY_COLUMN_NAME = ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
30+
31+
def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int =
32+
ParquetRowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema)
33+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet.shims
21+
22+
import org.apache.spark.executor.TaskMetrics
23+
import org.apache.spark.util.AccumulatorV2
24+
25+
object ShimTaskMetrics {
26+
27+
def getTaskAccumulator(taskMetrics: TaskMetrics): Option[AccumulatorV2[_, _]] =
28+
taskMetrics._externalAccums.lastOption
29+
}

pom.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,30 @@ under the License.
682682
</properties>
683683
</profile>
684684

685+
<profile>
686+
<!-- WIP: Spark 4.1 support, with its own shim sources for 4.1-specific APIs -->
687+
<id>spark-4.1</id>
688+
<properties>
689+
<!-- Spark 4.1.1 is compiled against Scala 2.13.17 and emits calls into stdlib methods
690+
added in that release (e.g. MurmurHash3$.caseClassHash$default$2()). Comet must
691+
match to avoid runtime NoSuchMethodError. Note: semanticdb-scalac_2.13.17 is not
692+
yet published, so the -Psemanticdb / scalafix lint job is skipped for spark-4.1. -->
693+
<scala.version>2.13.17</scala.version>
694+
<scala.binary.version>2.13</scala.binary.version>
695+
<spark.version>4.1.1</spark.version>
696+
<spark.version.short>4.1</spark.version.short>
697+
<parquet.version>1.16.0</parquet.version>
698+
<semanticdb.version>4.13.6</semanticdb.version>
699+
<slf4j.version>2.0.17</slf4j.version>
700+
<shims.majorVerSrc>spark-4.1</shims.majorVerSrc>
701+
<shims.minorVerSrc>not-needed-yet</shims.minorVerSrc>
702+
<!-- Use jdk17 by default -->
703+
<java.version>17</java.version>
704+
<maven.compiler.source>${java.version}</maven.compiler.source>
705+
<maven.compiler.target>${java.version}</maven.compiler.target>
706+
</properties>
707+
</profile>
708+
685709
<profile>
686710
<id>scala-2.12</id>
687711
</profile>

spark/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,31 @@ under the License.
273273
</dependency>
274274
</dependencies>
275275
</profile>
276+
<profile>
277+
<id>spark-4.1</id>
278+
<dependencies>
279+
<!-- iceberg-spark-runtime-4.1 is not yet published; reuse the 4.0 runtime -->
280+
<dependency>
281+
<groupId>org.apache.iceberg</groupId>
282+
<artifactId>iceberg-spark-runtime-4.0_${scala.binary.version}</artifactId>
283+
<version>1.10.0</version>
284+
<scope>test</scope>
285+
</dependency>
286+
<!-- Jetty 11.x for Spark 4.1 (jakarta.servlet); matches Spark 4.1.1's jetty.version -->
287+
<dependency>
288+
<groupId>org.eclipse.jetty</groupId>
289+
<artifactId>jetty-server</artifactId>
290+
<version>11.0.26</version>
291+
<scope>test</scope>
292+
</dependency>
293+
<dependency>
294+
<groupId>org.eclipse.jetty</groupId>
295+
<artifactId>jetty-servlet</artifactId>
296+
<version>11.0.26</version>
297+
<scope>test</scope>
298+
</dependency>
299+
</dependencies>
300+
</profile>
276301
<profile>
277302
<id>generate-docs</id>
278303
<build>

spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.apache.spark.memory.TaskMemoryManager;
4242
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
4343
import org.apache.spark.scheduler.MapStatus;
44-
import org.apache.spark.scheduler.MapStatus$;
4544
import org.apache.spark.serializer.SerializerInstance;
4645
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
4746
import org.apache.spark.shuffle.ShuffleWriter;
@@ -171,8 +170,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
171170
mapOutputWriter
172171
.commitAllPartitions(ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE)
173172
.getPartitionLengths();
174-
mapStatus =
175-
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
173+
mapStatus = MapStatusHelper.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
176174
return;
177175
}
178176
final long openStartTime = System.nanoTime();
@@ -262,7 +260,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
262260

263261
// TODO: We probably can move checksum generation here when concatenating partition files
264262
partitionLengths = writePartitionedData(mapOutputWriter);
265-
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
263+
mapStatus = MapStatusHelper.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
266264
} catch (Exception e) {
267265
try {
268266
mapOutputWriter.abort(e);

spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
5151
import org.apache.spark.network.util.LimitedInputStream;
5252
import org.apache.spark.scheduler.MapStatus;
53-
import org.apache.spark.scheduler.MapStatus$;
5453
import org.apache.spark.serializer.SerializationStream;
5554
import org.apache.spark.serializer.SerializerInstance;
5655
import org.apache.spark.shuffle.BaseShuffleHandle;
@@ -288,7 +287,7 @@ void closeAndWriteOutput() throws IOException {
288287
}
289288
}
290289
}
291-
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
290+
mapStatus = MapStatusHelper.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
292291
}
293292

294293
@VisibleForTesting

0 commit comments

Comments
 (0)