Skip to content

Commit 4678606

Browse files
authored
[runners-spark] Prep shared base for Spark 4 (#38324)
1 parent 083e579 commit 4678606

11 files changed

Lines changed: 39 additions & 20 deletions

File tree

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,7 @@ class BeamModulePlugin implements Plugin<Project> {
649649
def solace_version = "10.21.0"
650650
def spark2_version = "2.4.8"
651651
def spark3_version = "3.5.0"
652+
def spark4_version = "4.0.2"
652653
def spotbugs_version = "4.8.3"
653654
def testcontainers_version = "1.21.4"
654655
// [bomupgrader] determined by: org.apache.arrow:arrow-memory-core, consistent with: google_cloud_platform_libraries_bom
@@ -658,6 +659,7 @@ class BeamModulePlugin implements Plugin<Project> {
658659

659660
// Export Spark versions, so they are defined in a single place only
660661
project.ext.spark3_version = spark3_version
662+
project.ext.spark4_version = spark4_version
661663
// version for BigQueryMetastore catalog (used by sdks:java:io:iceberg:bqms)
662664
// TODO: remove this and download the jar normally when the catalog gets
663665
// open-sourced (https://github.com/apache/iceberg/pull/11039)
@@ -820,6 +822,7 @@ class BeamModulePlugin implements Plugin<Project> {
820822
jackson_datatype_jsr310 : "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jackson_version",
821823
jackson_module_scala_2_11 : "com.fasterxml.jackson.module:jackson-module-scala_2.11:$jackson_version",
822824
jackson_module_scala_2_12 : "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jackson_version",
825+
jackson_module_scala_2_13 : "com.fasterxml.jackson.module:jackson-module-scala_2.13:$jackson_version",
823826
jamm : 'com.github.jbellis:jamm:0.4.0',
824827
jaxb_api : "jakarta.xml.bind:jakarta.xml.bind-api:$jaxb_api_version",
825828
jaxb_impl : "com.sun.xml.bind:jaxb-impl:$jaxb_api_version",

runners/spark/job-server/spark_job_server.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ apply plugin: 'application'
2828
// we need to set mainClassName before applying shadow plugin
2929
mainClassName = "org.apache.beam.runners.spark.SparkJobServerDriver"
3030

31+
def sparkVersion = project.findProperty('spark_version') ?: ''
32+
3133
applyJavaNature(
34+
requireJavaVersion: (sparkVersion.startsWith("4") ? org.gradle.api.JavaVersion.VERSION_17 : null),
3235
automaticModuleName: 'org.apache.beam.runners.spark.jobserver',
3336
archivesBaseName: project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName,
3437
validateShadowJar: false,

runners/spark/spark_runner.gradle

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,20 @@
1919
import groovy.json.JsonOutput
2020

2121
apply plugin: 'org.apache.beam.module'
22+
23+
// Numeric version comparison (lexicographic string compare was fragile — e.g. "3.10.0" < "3.5.0").
24+
def isSparkAtLeast = { String minVersion ->
25+
def parts = spark_version.tokenize('.-').findAll { it.isInteger() }*.toInteger()
26+
def minParts = minVersion.tokenize('.-').findAll { it.isInteger() }*.toInteger()
27+
for (int i = 0; i < Math.min(parts.size(), minParts.size()); i++) {
28+
if (parts[i] != minParts[i]) return parts[i] > minParts[i]
29+
}
30+
return parts.size() >= minParts.size()
31+
}
32+
2233
applyJavaNature(
2334
enableStrictDependencies: true,
35+
requireJavaVersion: (isSparkAtLeast("4.0.0") ? org.gradle.api.JavaVersion.VERSION_17 : null),
2436
automaticModuleName: 'org.apache.beam.runners.spark',
2537
archivesBaseName: (project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName),
2638
exportJavadoc: (project.hasProperty('exportJavadoc') ? exportJavadoc : true),
@@ -240,7 +252,7 @@ dependencies {
240252
spark.components.each { component ->
241253
provided "$component:$spark_version"
242254
}
243-
if ("$spark_version" >= "3.5.0") {
255+
if (isSparkAtLeast("3.5.0")) {
244256
implementation "org.apache.spark:spark-common-utils_$spark_scala_version:$spark_version"
245257
implementation "org.apache.spark:spark-sql-api_$spark_scala_version:$spark_version"
246258
}
@@ -270,7 +282,7 @@ dependencies {
270282
testImplementation library.java.mockito_core
271283
testImplementation "org.assertj:assertj-core:3.11.1"
272284
testImplementation "org.apache.zookeeper:zookeeper:3.4.11"
273-
if ("$spark_version" >= "3.5.0") {
285+
if (isSparkAtLeast("3.5.0")) {
274286
testImplementation "org.apache.spark:spark-common-utils_$spark_scala_version:$spark_version"
275287
testImplementation "org.apache.spark:spark-sql-api_$spark_scala_version:$spark_version"
276288
}
@@ -284,7 +296,7 @@ dependencies {
284296
"hadoopVersion$kv.key" "org.apache.hadoop:hadoop-common:$kv.value"
285297
// Force paranamer 2.8 to avoid issues when using Scala 2.12
286298
"hadoopVersion$kv.key" "com.thoughtworks.paranamer:paranamer:2.8"
287-
if ("$spark_version" >= "3.5.0") {
299+
if (isSparkAtLeast("3.5.0")) {
288300
// Add log4j 2.x dependencies as Spark 3.5+ uses slf4j with log4j 2.x backend
289301
"hadoopVersion$kv.key" library.java.log4j2_api
290302
"hadoopVersion$kv.key" library.java.log4j2_core
@@ -310,7 +322,7 @@ configurations.validatesRunner {
310322
// Exclude to make sure log4j binding is used
311323
exclude group: "org.slf4j", module: "slf4j-simple"
312324

313-
if ("$spark_version" >= "3.5.0") {
325+
if (isSparkAtLeast("3.5.0")) {
314326
// Exclude log4j 1.x dependencies to prevent conflict with log4j 2.x used by spark 3.5+
315327
exclude group: "log4j", module: "log4j"
316328
}
@@ -321,7 +333,7 @@ hadoopVersions.each { kv ->
321333
resolutionStrategy {
322334
force "org.apache.hadoop:hadoop-common:$kv.value"
323335
}
324-
if ("$spark_version" >= "3.5.0") {
336+
if (isSparkAtLeast("3.5.0")) {
325337
// Exclude log4j 1.x dependencies to prevent conflict with log4j 2.x used by spark 3.5+
326338
exclude group: "log4j", module: "log4j"
327339
}

runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import org.slf4j.Logger;
5151
import org.slf4j.LoggerFactory;
5252
import scala.Option;
53-
import scala.collection.JavaConversions;
53+
import scala.collection.JavaConverters;
5454

5555
/** Classes implementing Beam {@link Source} {@link RDD}s. */
5656
@SuppressWarnings({
@@ -75,7 +75,7 @@ public static class Bounded<T> extends RDD<WindowedValue<T>> {
7575

7676
// to satisfy Scala API.
7777
private static final scala.collection.immutable.Seq<Dependency<?>> NIL =
78-
JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()).toList();
78+
JavaConverters.asScalaBuffer(Collections.<Dependency<?>>emptyList()).toList();
7979

8080
public Bounded(
8181
SparkContext sc,
@@ -148,7 +148,7 @@ public scala.collection.Iterator<WindowedValue<T>> compute(
148148
final Iterator<WindowedValue<T>> readerIterator =
149149
new ReaderToIteratorAdapter<>(metricsContainer, reader);
150150

151-
return new InterruptibleIterator<>(context, JavaConversions.asScalaIterator(readerIterator));
151+
return new InterruptibleIterator<>(context, JavaConverters.asScalaIterator(readerIterator));
152152
}
153153

154154
/**
@@ -299,7 +299,7 @@ public static class Unbounded<T, CheckpointMarkT extends UnboundedSource.Checkpo
299299

300300
// to satisfy Scala API.
301301
private static final scala.collection.immutable.List<Dependency<?>> NIL =
302-
JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()).toList();
302+
JavaConverters.asScalaBuffer(Collections.<Dependency<?>>emptyList()).toList();
303303

304304
public Unbounded(
305305
SparkContext sc,
@@ -344,7 +344,7 @@ public scala.collection.Iterator<scala.Tuple2<Source<T>, CheckpointMarkT>> compu
344344
(CheckpointableSourcePartition<T, CheckpointMarkT>) split;
345345
scala.Tuple2<Source<T>, CheckpointMarkT> tuple2 =
346346
new scala.Tuple2<>(partition.getSource(), partition.checkpointMark);
347-
return JavaConversions.asScalaIterator(Collections.singleton(tuple2).iterator());
347+
return JavaConverters.asScalaIterator(Collections.singleton(tuple2).iterator());
348348
}
349349
}
350350

runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ public Duration slideDuration() {
186186

187187
@Override
188188
public scala.collection.immutable.List<DStream<?>> dependencies() {
189-
return scala.collection.JavaConversions.asScalaBuffer(
189+
return scala.collection.JavaConverters.asScalaBuffer(
190190
Collections.<DStream<?>>singletonList(parent))
191191
.toList();
192192
}

runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
import scala.Tuple2;
7474
import scala.Tuple3;
7575
import scala.collection.Iterator;
76-
import scala.collection.JavaConversions;
76+
import scala.collection.JavaConverters;
7777
import scala.collection.Seq;
7878
import scala.runtime.AbstractFunction1;
7979

@@ -238,7 +238,7 @@ private Collection<TimerInternals.TimerData> filterTimersEligibleForProcessing(
238238
// new input for key.
239239
try {
240240
final Iterable<WindowedValue<InputT>> elements =
241-
FluentIterable.from(JavaConversions.asJavaIterable(encodedElements))
241+
FluentIterable.from(JavaConverters.asJavaIterable(encodedElements))
242242
.transform(bytes -> CoderHelpers.fromByteArray(bytes, wvCoder));
243243

244244
LOG.trace("{}: input elements: {}", logPrefix, elements);
@@ -410,7 +410,7 @@ private Collection<TimerInternals.TimerData> filterTimersEligibleForProcessing(
410410
droppedDueToClosedWindow.inc(-droppedDueToClosedWindow.getCumulative());
411411
}
412412

413-
return scala.collection.JavaConversions.asScalaIterator(
413+
return JavaConverters.asScalaIterator(
414414
new UpdateStateByKeyOutputIterator(input, reduceFn, droppedDueToLateness));
415415
}
416416
}

runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.beam.runners.spark.structuredstreaming;
1919

2020
import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
21-
import static org.sparkproject.guava.base.Objects.firstNonNull;
21+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
2222

2323
import java.io.IOException;
2424
import java.util.concurrent.ExecutionException;

runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
1919

20+
import java.io.Serializable;
2021
import java.util.ArrayList;
2122
import java.util.Collections;
2223
import java.util.List;
@@ -49,7 +50,6 @@
4950
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
5051
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
5152
import org.joda.time.Instant;
52-
import scala.Serializable;
5353

5454
/**
5555
* Factory to create a {@link DoFnRunner}. The factory supports fusing multiple {@link DoFnRunner

runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,8 @@ private static <T> void translateFlatten(
330330
}
331331
}
332332
// Unify streams into a single stream.
333-
unifiedStreams = context.getStreamingContext().union(JavaConverters.asScalaBuffer(dStreams));
333+
unifiedStreams =
334+
context.getStreamingContext().union(JavaConverters.asScalaBuffer(dStreams).toList());
334335
}
335336

336337
context.pushDataset(

runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/ParDoStateUpdateFn.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.Serializable;
2121
import java.util.Collection;
22+
import java.util.Collections;
2223
import java.util.Iterator;
2324
import java.util.List;
2425
import java.util.Map;
@@ -62,7 +63,6 @@
6263
import org.checkerframework.checker.nullness.qual.Nullable;
6364
import org.slf4j.Logger;
6465
import org.slf4j.LoggerFactory;
65-
import org.sparkproject.guava.collect.Iterators;
6666
import scala.Option;
6767
import scala.Tuple2;
6868
import scala.runtime.AbstractFunction3;
@@ -236,7 +236,7 @@ public TimerInternals timerInternals() {
236236
final byte[] byteValue = serializedValue.get();
237237
@Nullable WindowedValue<ValueT> windowedValue;
238238
@Nullable WindowedValue<KV<KeyT, ValueT>> keyedWindowedValue;
239-
Iterator<WindowedValue<KV<KeyT, ValueT>>> iterator = Iterators.emptyIterator();
239+
Iterator<WindowedValue<KV<KeyT, ValueT>>> iterator = Collections.emptyIterator();
240240
if (byteValue.length > 0) {
241241
windowedValue = CoderHelpers.fromByteArray(byteValue, this.wvCoder);
242242
keyedWindowedValue = windowedValue.withValue(KV.of(key, windowedValue.getValue()));

0 commit comments

Comments
 (0)