diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java index 500aedecb5d6..d1278eddaef0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java @@ -63,6 +63,18 @@ public interface DoFnRunner void onWindowExpiration( BoundedWindow window, Instant timestamp, KeyT key); + /** + * Performs per-key cleanup or processing after all elements, timers for a key have been processed + * and before moving to the next key or before finishBundle for the last key. + * + *

This is an optional method that can be used by runners as a hook to reset any per key state + * before moving to a different key in the same bundle. Currently used only by the Dataflow + * Streaming runner. + * + * @param key current key to clean up or finish processing + */ + void finishKey(KeyT key); + /** * Returns the underlying fn instance. * diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 41052a76f13e..dfab198f8932 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; /** @@ -101,6 +102,11 @@ public void finishBundle() { doFnRunner.finishBundle(); } + @Override + public void finishKey(KeyT key) { + doFnRunner.finishKey(key); + } + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { doFnRunner.onWindowExpiration(window, timestamp, key); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 1825b77b65fb..90d974653b6a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -230,6 +230,9 @@ public void finishBundle() { } } + @Override + public void finishKey(KeyT key) {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { invoker.invokeOnWindowExpiration( diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java index 779138834669..f5de79652f23 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -131,6 +132,11 @@ public void finishBundle() { doFnRunner.finishBundle(); } + @Override + public void finishKey(KeyT key) { + doFnRunner.finishKey(key); + } + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { doFnRunner.onWindowExpiration(window, timestamp, key); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java index 1ae937b7a836..aa61122a7547 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java @@ -65,6 +65,7 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; @@ -379,6 +380,9 @@ public void finishBundle() { finished = true; } + @Override + public void finishKey(KeyT key) {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java index 56e077253ae6..c327c8d91bea 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; /** @@ -105,6 +106,9 @@ public void finishBundle() { container.updateMetrics(stepName); } + @Override + public void finishKey(KeyT key) {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { delegate.onWindowExpiration(window, timestamp, key); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index 4ebb359fceae..f7bc62a8ec5a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -1064,6 +1064,9 @@ public void finishBundle() { } } + @Override + public void finishKey(KeyT key) {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java index 73b20238ef05..6d1d4085f0bc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java @@ -255,6 +255,9 @@ public void finishBundle() { Optional.ofNullable(finishBundleCallback).ifPresent(Runnable::run); } + @Override + public void finishKey(KeyT key) {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index 5c4975ffab01..cce8f808c242 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -523,6 +523,9 @@ public void finishBundle() { wrappedRunner.finishBundle(); } + @Override + public void finishKey(KeyT key) {} + @Override public void onWindowExpiration( BoundedWindow window, Instant timestamp, KeyT key) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java index 83cbc3aa62c7..673130a6048a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java @@ -119,6 +119,9 @@ public void processTimers() throws Exception { // Nothing. } + @Override + public void finishKey(Object key) throws Exception {} + @Override public void finishBundle() throws Exception { receiver = null; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java index 8e2b325b580a..c3cf6d9e67e2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java @@ -73,6 +73,9 @@ public void processTimers() throws Exception { // The timers for the underlying ParDoFn are processed at the end of each element } + @Override + public void finishKey(Object key) throws Exception {} + @Override public void finishBundle() throws Exception { underlyingParDoFn.finishBundle(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java index bd991560c186..afe2e28b2a67 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java @@ -114,6 +114,9 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey(Object key) throws Exception {} + @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java index ec1fcd6c8432..400dbc047443 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; /** @@ -128,6 +129,11 @@ public void finishBundle() { simpleRunner.finishBundle(); } + @Override + public void finishKey(KeyT key) { + simpleRunner.finishKey(key); + } + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { simpleRunner.onWindowExpiration(window, timestamp, key); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java index d55181559322..3a864d6caf2f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java @@ -19,6 +19,7 @@ import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn; import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver; +import org.checkerframework.checker.nullness.qual.Nullable; /** * A base class for {@link ParDoFn} implementations for overriding particular methods while @@ -47,6 +48,11 @@ public void processTimers() throws Exception { delegate.processTimers(); } + @Override + public void finishKey(@Nullable Object key) throws Exception { + delegate.finishKey(key); + } + @Override public void finishBundle() throws Exception { delegate.finishBundle(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java index 4845bb0c98e4..aa60a61af8af 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; /** @@ -102,6 +103,9 @@ private void invokeProcessElement(WindowedValue elem) { @Override public void finishBundle() {} + @Override + public void finishKey(KeyT key) {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java index 882dd497e3f5..e204a78a7d2e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java @@ -142,6 +142,12 @@ public void processTimers() throws Exception { // it here to build a KeyedWorkItem } + @Override + public void finishKey(Object key) throws Exception { + checkState(fnRunner != null); + fnRunner.finishKey(key); + } + @Override public void finishBundle() throws Exception { checkState(fnRunner != null); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java index 6951e3a95b20..425184a4a126 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java @@ -98,6 +98,9 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey(Object key) throws Exception {} + @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java index 399258d7dbb9..a6d7810412a1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java @@ -317,6 +317,9 @@ public void processElement(Object elem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey(Object key) throws Exception {} + @Override public void finishBundle() throws Exception { groupingTable.flush(receiver); @@ -377,6 +380,9 @@ public void processElement(Object elem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey(Object key) throws Exception {} + @Override public void finishBundle() throws Exception { groupingTable.flush(receiver); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java index 746c09404f6e..0438b525b6b9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java @@ -86,6 +86,9 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey(Object key) throws Exception {} + @Override public void finishBundle() throws Exception { this.receiver = null; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java index 34dff6b88358..e0f1e0f410cd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java @@ -195,6 +195,9 @@ public void processTimers() throws Exception { () -> sideInputProcessor); } + @Override + public void finishKey(Object key) throws Exception {} + @Override public void finishBundle() throws Exception { helpers.finishBundle(sideInputProcessor); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java index 0b9ccd1f37c6..3de7c0f3a9b9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java @@ -39,6 +39,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; /** @@ -155,6 +156,11 @@ public void finishBundle() { sideInputFetcher.persist(); } + @Override + public void finishKey(KeyT key) { + simpleDoFnRunner.finishKey(key); + } + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { simpleDoFnRunner.onWindowExpiration(window, timestamp, key); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java index 225bc6af0ea9..63de0b8d55db 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java @@ -193,6 +193,9 @@ public void processTimers() throws Exception { () -> sideInputProcessor); } + @Override + public void finishKey(Object key) throws Exception {} + @Override public void finishBundle() throws Exception { helpers.finishBundle(sideInputProcessor); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 00fdf67b8d02..d88864745648 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -282,7 +282,7 @@ public void finishKey() { checkState(!finishKeyCalled, "finishKey was already called"); checkStateNotNull(workExecutor, "workExecutor must be set before calling finishKey()"); try { - workExecutor.finishKey(); + workExecutor.finishKey(key); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java index 61730b0c8d88..6b51427bb930 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java @@ -73,6 +73,9 @@ public void processElement(Object element) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey(Object key) throws Exception {} + @Override public void finishBundle() throws Exception {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java index a64d1a970d34..ef1a5922fe5c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; /** @@ -80,6 +81,11 @@ public void onTimer( "Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow."); } + @Override + public void finishKey(KeyT key) { + simpleDoFnRunner.finishKey(key); + } + @Override public void finishBundle() { simpleDoFnRunner.finishBundle(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java index f9e2d6de2461..261ed69fb5d7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java @@ -149,6 +149,9 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey(Object key) throws Exception {} + @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java index 3ddb3c2003db..acf143a5467b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java @@ -80,6 +80,9 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey(Object key) throws Exception {} + @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java index a1321d57ebb6..3220dbade61c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java @@ -90,7 +90,7 @@ public void execute() throws Exception { } @Override - public void finishKey() throws Exception {} + public void finishKey(Object key) throws Exception {} @Override public SourceOperationResponse getResponse() { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java index af1b2b9c48bd..7ef011266507 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java @@ -19,6 +19,7 @@ import java.io.Closeable; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; /** A flatten operation. */ public class FlattenOperation extends ReceivingOperation { @@ -44,7 +45,7 @@ public void process(Object elem) throws Exception { } @Override - public void finishKey() throws Exception {} + public void finishKey(@Nullable Object key) throws Exception {} @Override public boolean supportsRestart() { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java index c6e1ae209b98..90fc76276940 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java @@ -110,9 +110,9 @@ public void execute() throws Exception { } @Override - public void finishKey() throws Exception { + public void finishKey(@Nullable Object key) throws Exception { for (Operation op : operations) { - op.finishKey(); + op.finishKey(key); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java index b630da33cfad..138ce525e26b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.worker.util.common.worker; +import org.checkerframework.checker.nullness.qual.Nullable; + /** * The abstract base class for Operations, which correspond to Instructions in the original MapTask * InstructionGraph. @@ -138,7 +140,7 @@ public void finish() throws Exception { } /** Called when all elements for a specific key have been processed. */ - public abstract void finishKey() throws Exception; + public abstract void finishKey(@Nullable Object key) throws Exception; /** Aborts this Operation's execution. */ public void abort() throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java index 84dbbd627b08..75ecdb67dd42 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.worker.util.common.worker; +import org.checkerframework.checker.nullness.qual.Nullable; + /** * Interface for functions invokable by {@link ParDoOperation} instances. * @@ -30,6 +32,8 @@ public interface ParDoFn { void processTimers() throws Exception; + void finishKey(@Nullable Object key) throws Exception; + void finishBundle() throws Exception; void abort() throws Exception; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java index 68f5fbe688da..a814b90cae21 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java @@ -19,6 +19,7 @@ import java.io.Closeable; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; /** A ParDo mapping function. */ public class ParDoOperation extends ReceivingOperation { @@ -48,10 +49,11 @@ public void process(Object elem) throws Exception { // Batch mode does not use this method and instead relies on BatchModeUngroupingParDoFn // to process timers per key. @Override - public void finishKey() throws Exception { + public void finishKey(@Nullable Object key) throws Exception { try (Closeable scope = context.enterProcessTimers()) { checkStarted(); fn.processTimers(); + fn.finishKey(key); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java index fabc8d6af25b..5d118626f18d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java @@ -272,7 +272,7 @@ public void finish() throws Exception { } @Override - public void finishKey() throws Exception {} + public void finishKey(@Nullable Object key) throws Exception {} @Override public void abort() throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java index 7a9fcfdf0694..cce77f3aceb6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.worker.util.common.worker; +import org.checkerframework.checker.nullness.qual.Nullable; + /** A partial group-by-key {@link ParDoFn} implementation. */ public class SimplePartialGroupByKeyParDoFn implements ParDoFn { private final GroupingTable groupingTable; @@ -39,6 +41,9 @@ public void processElement(Object elem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey(@Nullable Object key) throws Exception {} + @Override public void finishBundle() throws Exception { groupingTable.flush(receiver); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java index 1083fdbb9c42..8aab050fb22c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java @@ -35,7 +35,7 @@ public interface WorkExecutor extends AutoCloseable { public abstract void execute() throws Exception; /** Called when all elements for a specific key have been processed. */ - void finishKey() throws Exception; + void finishKey(@Nullable Object key) throws Exception; /** * Returns the worker's current progress. diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java index d28e7f3e5d3d..a97c9920b9a3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java @@ -21,6 +21,7 @@ import org.apache.beam.runners.dataflow.worker.counters.Counter; import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.checkerframework.checker.nullness.qual.Nullable; /** A write operation. */ @SuppressWarnings({ @@ -106,7 +107,7 @@ public void finish() throws Exception { } @Override - public void finishKey() throws Exception {} + public void finishKey(@Nullable Object key) throws Exception {} @Override public void abort() throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java index 396c8db87e6b..73d69bd0f617 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java @@ -106,7 +106,7 @@ public void abort() throws Exception { } @Override - public void finishKey() throws Exception {} + public void finishKey(Object key) throws Exception {} } // A mock ReadOperation fed to a MapTaskExecutor in test. @@ -220,6 +220,9 @@ public void finishBundle() {} @Override public void abort() {} + + @Override + public void finishKey(Object key) throws Exception {} } /** Verify counts for the per-element-output-time counter are correct. */ @@ -317,7 +320,7 @@ public void start() throws Exception { } @Override - public void finishKey() throws Exception {} + public void finishKey(Object key) throws Exception {} }, new Operation(new OutputReceiver[] {}, context2) { @Override @@ -329,7 +332,7 @@ public void start() throws Exception { } @Override - public void finishKey() throws Exception {} + public void finishKey(Object key) throws Exception {} }, new Operation(new OutputReceiver[] {}, context3) { @Override @@ -341,7 +344,7 @@ public void start() throws Exception { } @Override - public void finishKey() throws Exception {} + public void finishKey(Object key) throws Exception {} }); try (IntrinsicMapTaskExecutor executor = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java index d18bc512723e..c110cc0d2bf7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java @@ -150,6 +150,7 @@ public void testSideInputNotReady() throws Exception { runner.startBundle(); runner.processElement(createDatum("e", 0)); + runner.finishKey("key"); runner.finishBundle(); assertTrue(outputManager.getOutput(mainOutputTag).isEmpty()); @@ -214,6 +215,7 @@ public void testMultipleWindowsNotReady() throws Exception { runner.startBundle(); runner.processElement(elem); + runner.finishKey("key"); runner.finishBundle(); assertTrue(outputManager.getOutput(mainOutputTag).isEmpty()); @@ -317,6 +319,7 @@ public void testSideInputNotification() throws Exception { when(mockSideInputReader.get(eq(view), any(BoundedWindow.class))).thenReturn("data"); runner.startBundle(); + runner.finishKey("key"); runner.finishBundle(); assertThat(outputManager.getOutput(mainOutputTag), contains(createDatum("e:data", 0))); @@ -373,6 +376,7 @@ public void testMultipleSideInputs() throws Exception { runner.startBundle(); runner.processElement(createDatum("e2", 2)); + runner.finishKey("key"); runner.finishBundle(); assertThat( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java index d5e3b9c87139..ac7c787b1d26 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java @@ -61,7 +61,7 @@ private static OutputReceiver[] createOutputReceivers(int numOutputs, CounterSet } @Override - public void finishKey() throws Exception {} + public void finishKey(Object key) throws Exception {} } /** A {@code Reader} that yields a specified set of values. */ diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java index 5d8f8eebb6f6..40f4c7b0e715 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java @@ -102,7 +102,7 @@ public void abort() throws Exception { } @Override - public void finishKey() throws Exception {} + public void finishKey(Object key) throws Exception {} } // A mock ReadOperation fed to a MapTaskExecutor in test. @@ -216,6 +216,9 @@ public void finishBundle() {} @Override public void abort() {} + + @Override + public void finishKey(Object key) throws Exception {} } /** Verify counts for the per-element-output-time counter are correct. */ @@ -314,7 +317,7 @@ public void start() throws Exception { } @Override - public void finishKey() throws Exception {} + public void finishKey(Object key) throws Exception {} }, new Operation(new OutputReceiver[] {}, context2) { @Override @@ -326,7 +329,7 @@ public void start() throws Exception { } @Override - public void finishKey() throws Exception {} + public void finishKey(Object key) throws Exception {} }, new Operation(new OutputReceiver[] {}, context3) { @Override @@ -338,7 +341,7 @@ public void start() throws Exception { } @Override - public void finishKey() throws Exception {} + public void finishKey(Object key) throws Exception {} }); assertEquals(TimeUnit.MINUTES.toMillis(10), stateTracker.getNextBundleLullDurationReportMs()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java index 5d058b1968cb..0c9667acf38b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java @@ -85,6 +85,9 @@ public void finishBundle() throws Exception { public void abort() throws Exception { outputReceiver.process("a-aborted"); } + + @Override + public void finishKey(Object key) throws Exception {} } @Test @@ -104,7 +107,7 @@ public void testRunParDoOperation() throws Exception { parDoOperation.process(""); parDoOperation.process("bob"); - parDoOperation.finishKey(); + parDoOperation.finishKey("key"); parDoOperation.finish(); parDoOperation.abort(); @@ -148,7 +151,7 @@ public void testParDoOperationContext() throws Exception { operation.start(); operation.process("hello"); - operation.finishKey(); + operation.finishKey("key"); operation.finish(); InOrder inOrder = @@ -163,6 +166,7 @@ public void testParDoOperationContext() throws Exception { inOrder.verify(processCloseable).close(); inOrder.verify(context).enterProcessTimers(); inOrder.verify(fn).processTimers(); + inOrder.verify(fn).finishKey("key"); inOrder.verify(processTimersCloseable).close(); inOrder.verify(context).enterFinish(); inOrder.verify(fn).finishBundle(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java index 99ce3dc69889..5e8703a05b06 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java @@ -49,6 +49,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; /** @@ -284,6 +285,9 @@ public void finishBundle() { } } + @Override + public void finishKey(KeyT key) {} + @Override public DoFn getFn() { throw new UnsupportedOperationException(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java index 28dbf44cb8fe..7202de0f0aa8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; /** DoFnRunner decorator which registers {@link MetricsContainer}. */ @@ -104,6 +105,9 @@ public void finishBundle() { } } + @Override + public void finishKey(KeyT key) {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { delegate.onWindowExpiration(window, timestamp, key); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java index c8cd7eb5f262..bc434b2117de 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; /** DoFnRunner decorator which registers {@link MetricsContainerImpl}. */ @@ -103,6 +104,9 @@ public void finishBundle() { } } + @Override + public void finishKey(KeyT key) {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { delegate.onWindowExpiration(window, timestamp, key); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java index 2dc428d5a6b2..35fe7b745ea3 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -259,6 +260,9 @@ public void onTimer( @Override public void finishBundle() {} + @Override + public void finishKey(KeyT key) {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}