From 6eae8762b9e50bff30bcaf91fd29d63ec57c0a5f Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 30 Mar 2026 13:07:58 +0200 Subject: [PATCH 1/2] [#37994] Fix NullPointerException in Spark Runner with multiple outputs and serialization --- .../translation/TransformTranslator.java | 4 ++ .../spark/translation/TranslationUtils.java | 10 ++- .../StatefulStreamingParDoEvaluator.java | 4 ++ .../StreamingTransformTranslator.java | 5 ++ .../translation/TransformTranslatorTest.java | 64 +++++++++++++++++++ 5 files changed, 85 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 5362beba09dc..11b73b189269 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Objects; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -486,6 +487,9 @@ public void evaluate( TranslationUtils.getTupleTagCoders(outputs); all = all.mapToPair(TranslationUtils.getTupleTagEncodeFunction(coderMap)) + .filter( + Objects::nonNull) // skip nulls to save on encoding, nulls are tags that are + // not read .persist(level) .mapToPair(TranslationUtils.getTupleTagDecodeFunction(coderMap)); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index f5c3fd932742..9d3a0567f5cc 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -445,8 +445,14 @@ public static Map, Coder>> getTupleTagCoders( return tuple2 -> { TupleTag tupleTag = tuple2._1; WindowedValue windowedValue = tuple2._2; - return new Tuple2<>( - tupleTag, ValueAndCoderLazySerializable.of(windowedValue, coderMap.get(tupleTag))); + Coder> coder = coderMap.get(tupleTag); + if (coder == null) { + // there is no coder as this output is unconsumed and is not read anywhere, so coder is + // pruned + // from coderMap + return null; + } + return new Tuple2<>(tupleTag, ValueAndCoderLazySerializable.of(windowedValue, coder)); }; } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StatefulStreamingParDoEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StatefulStreamingParDoEvaluator.java index bf500f36c2cb..d1be8210eec1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StatefulStreamingParDoEvaluator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StatefulStreamingParDoEvaluator.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; @@ -234,6 +235,9 @@ public void evaluate( TranslationUtils.getTupleTagCoders(outputs); all = all.mapToPair(TranslationUtils.getTupleTagEncodeFunction(coderMap)) + .filter( + Objects + ::nonNull) // skip nulls to save on encoding, nulls are tags that are not read .cache() .mapToPair(TranslationUtils.getTupleTagDecodeFunction(coderMap)); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 9534b352f200..48697a3dbafc 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; @@ -593,6 +594,10 @@ public void evaluate( TranslationUtils.getTupleTagCoders(outputs); all = all.mapToPair(TranslationUtils.getTupleTagEncodeFunction(coderMap)) + .filter( + Objects + ::nonNull) // skip nulls to save on encoding, nulls are tags that are not + // read .cache() .mapToPair(TranslationUtils.getTupleTagDecodeFunction(coderMap)); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index f5ad228152fc..2c38b6361325 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -40,7 +40,9 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.KV; @@ -48,6 +50,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @@ -247,4 +250,65 @@ public void testMultipleOutputParDoShouldHaveFilterWhenSideOutputIsConsumed() { assertTrue(parsed.stream().anyMatch(e -> e.getName().contains(tag.getId()))); } } + + @Test + public void testMultipleOutputParDoWithUnconsumedSideOutputAndSerializationStorageLevel() { + Pipeline p = Pipeline.create(); + TupleTag tag1 = new TupleTag("tag1") {}; + TupleTag tag2 = new TupleTag("tag2") {}; + TupleTag tag3 = new TupleTag("tag3") {}; + + SparkPipelineOptions options = contextRule.createPipelineOptions(); + // Force serialization by setting storage level to MEMORY_AND_DISK_SER + options.setStorageLevel("MEMORY_AND_DISK_SER"); + + TransformTranslator.Translator translator = new TransformTranslator.Translator(); + + PTransform> createTransform = Create.of("foo", "bar"); + + PCollectionTuple pCollectionTuple = + p.apply("Create Values", createTransform) + .apply( + "Multiple Output ParDo", + ParDo.of(new MultiOutputDoFn(tag1, tag2, tag3)) + .withOutputTags(tag1, TupleTagList.of(tag2).and(tag3))); + + // consume tag1 and tag2 + pCollectionTuple.get(tag1).apply("Count1", Count.globally()); + pCollectionTuple.get(tag2).apply("Count2", Count.globally()); + + p.replaceAll(SparkTransformOverrides.getDefaultOverrides(false)); + + EvaluationContext ctxt = new EvaluationContext(contextRule.getSparkContext(), p, options); + SparkRunner.initAccumulators(options, ctxt.getSparkContext()); + SparkRunner.updateDependentTransforms(p, translator, ctxt); + + // This should not throw NullPointerException + p.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); + + // Also trigger some action on the RDD to ensure serialization happens + @SuppressWarnings("unchecked") + BoundedDataset dataset = + (BoundedDataset) ctxt.borrowDataset(pCollectionTuple.get(tag1)); + dataset.getRDD().count(); + } + + private static class MultiOutputDoFn extends DoFn { + private final TupleTag tag1; + private final TupleTag tag2; + private final TupleTag tag3; + + MultiOutputDoFn(TupleTag tag1, TupleTag tag2, TupleTag tag3) { + this.tag1 = tag1; + this.tag2 = tag2; + this.tag3 = tag3; + } + + @ProcessElement + public void process(@Element String input, MultiOutputReceiver outputReceiver) { + outputReceiver.get(tag1).output(input); + outputReceiver.get(tag2).output(input); + outputReceiver.get(tag3).output(input); + } + } } From 5573fbc995102fe2b87f64df4833dc6c605f295a Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 31 Mar 2026 17:21:26 +0200 Subject: [PATCH 2/2] trigger test. --- .../beam_PostCommit_Java_ValidatesRunner_Spark.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json index f0c7c2ae3cfd..f4e3f8b8dbe2 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json @@ -13,5 +13,6 @@ "https://github.com/apache/beam/pull/34155": "noting that PR #34155 should run this test", "https://github.com/apache/beam/pull/34560": "noting that PR #34560 should run this test", "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface", - "https://github.com/apache/beam/pull/35316": "noting that PR #35316 should run this test" + "https://github.com/apache/beam/pull/35316": "noting that PR #35316 should run this test", + "https://github.com/apache/beam/pull/38011": "noting that PR #38011 should run this test" }