What happened?
Summary
Unable to use Beam with Spark Runner in Dataproc. The wordcount example worked fine in Dataproc 2.0 but fails in Dataproc 2.1 with the following error:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/beam/sdk/coders/CoderProviderRegistrar
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:800)
at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:698)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:621)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:579)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:581)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1210)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1221)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1265)
at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300)
at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators.addAll(Iterators.java:366)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists.newArrayList(Lists.java:146)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists.newArrayList(Lists.java:132)
at org.apache.beam.sdk.coders.CoderRegistry.<clinit>(CoderRegistry.java:168)
at org.apache.beam.sdk.Pipeline.getCoderRegistry(Pipeline.java:334)
at org.apache.beam.sdk.values.PCollection.finishSpecifyingOutput(PCollection.java:94)
at org.apache.beam.sdk.runners.TransformHierarchy.setOutput(TransformHierarchy.java:173)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:546)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:175)
at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:150)
at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:134)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
at org.apache.beam.sdk.io.TextIO$Read.expand(TextIO.java:413)
at org.apache.beam.sdk.io.TextIO$Read.expand(TextIO.java:275)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:190)
at org.apache.beam.examples.WordCount.runWordCount(WordCount.java:201)
at org.apache.beam.examples.WordCount.main(WordCount.java:213)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:973)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1061)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1070)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.beam.sdk.coders.CoderProviderRegistrar
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
... 53 more
One major difference between the two is that the latter uses Java 11. We tried updating the Maven plugins to the latest version, setting the correct Spark/Hadoop version, and also setting the Maven plugin's target and release to Java 11 but the result is the same. (see the updated POM)
Reproduction Steps
- create a DP 2.1 cluster:
CLUSTER_NAME=dp-21
REGION=us-central1
DATAPROC_IMAGE_VERSION=2.1-ubuntu20
BEAM_VERSION=2.37.0
PROJECT="<PROJECT_ID>"
gcloud dataproc clusters create $CLUSTER_NAME \
--project=$PROJECT \
--optional-components=DOCKER \
--image-version=$DATAPROC_IMAGE_VERSION \
--region=$REGION \
--enable-component-gateway \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--properties spark:spark.master.rest.enabled=true
- clone the Beam quickstart template according to the public docs
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.51.0 \
-DgroupId=org.example \
-DartifactId=word-count-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
- Add the following dependency to
pom.xml (needed in Java 11)
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>2.13.3</version>
</dependency>
- Build for SparkRunner
mvn package -Pspark-runner
- Run output on DP cluster as Spark job:
gcloud dataproc jobs submit spark --project=$PROJECT --cluster=$CLUSTER_NAME --region=$REGION --class=org.apache.beam.examples.WordCount --jars=target/word-count-beam-bundled-0.1.jar -- --runner=SparkRunner --output=gs://path/to/output
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
What happened?
Summary
Unable to use Beam with Spark Runner in Dataproc. The wordcount example worked fine in Dataproc 2.0 but fails in Dataproc 2.1 with the following error:
One major difference between the two is that the latter uses Java 11. We tried updating the Maven plugins to the latest version, setting the correct Spark/Hadoop version, and also setting the Maven plugin's target and release to Java 11 but the result is the same. (see the updated POM)
Reproduction Steps
pom.xml(needed in Java 11)Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components