Skip to content

Commit a88686c

Browse files
authored
Add DoFnRunner::finishKey() method (#38454)
Currently the method is a no-op. In upcoming changes there'll be multiple streaming work items in a single beam bundle. With multiple work items, we've to process elements and timers of each work item before moving to the next work items. finishKey() will be called by the NativeIterator classes after iterating through all elements from a work item.
1 parent 56a4142 commit a88686c

46 files changed

Lines changed: 180 additions & 21 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,18 @@ public interface DoFnRunner<InputT extends @Nullable Object, OutputT extends @Nu
6363
<KeyT extends @Nullable Object> void onWindowExpiration(
6464
BoundedWindow window, Instant timestamp, KeyT key);
6565

66+
/**
67+
* Performs per-key cleanup or processing after all elements, timers for a key have been processed
68+
* and before moving to the next key or before finishBundle for the last key.
69+
*
70+
* <p>This is an optional method that can be used by runners as a hook to reset any per key state
71+
* before moving to a different key in the same bundle. Currently used only by the Dataflow
72+
* Streaming runner.
73+
*
74+
* @param key current key to clean up or finish processing
75+
*/
76+
<KeyT extends @Nullable Object> void finishKey(KeyT key);
77+
6678
/**
6779
* Returns the underlying fn instance.
6880
*

runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.beam.sdk.values.WindowedValues;
3232
import org.apache.beam.sdk.values.WindowingStrategy;
3333
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
34+
import org.checkerframework.checker.nullness.qual.Nullable;
3435
import org.joda.time.Instant;
3536

3637
/**
@@ -101,6 +102,11 @@ public void finishBundle() {
101102
doFnRunner.finishBundle();
102103
}
103104

105+
@Override
106+
public <KeyT extends @Nullable Object> void finishKey(KeyT key) {
107+
doFnRunner.finishKey(key);
108+
}
109+
104110
@Override
105111
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
106112
doFnRunner.onWindowExpiration(window, timestamp, key);

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,9 @@ public void finishBundle() {
230230
}
231231
}
232232

233+
@Override
234+
public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
235+
233236
@Override
234237
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
235238
invoker.invokeOnWindowExpiration(

runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.beam.sdk.values.WindowedValues;
4444
import org.apache.beam.sdk.values.WindowingStrategy;
4545
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
46+
import org.checkerframework.checker.nullness.qual.Nullable;
4647
import org.joda.time.Duration;
4748
import org.joda.time.Instant;
4849

@@ -131,6 +132,11 @@ public void finishBundle() {
131132
doFnRunner.finishBundle();
132133
}
133134

135+
@Override
136+
public <KeyT extends @Nullable Object> void finishKey(KeyT key) {
137+
doFnRunner.finishKey(key);
138+
}
139+
134140
@Override
135141
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
136142
doFnRunner.onWindowExpiration(window, timestamp, key);

runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.beam.sdk.values.WindowingStrategy;
6666
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
6767
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
68+
import org.checkerframework.checker.nullness.qual.Nullable;
6869
import org.joda.time.Duration;
6970
import org.joda.time.Instant;
7071
import org.junit.Before;
@@ -379,6 +380,9 @@ public void finishBundle() {
379380
finished = true;
380381
}
381382

383+
@Override
384+
public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
385+
382386
@Override
383387
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}
384388
}

runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2828
import org.apache.beam.sdk.values.CausedByDrain;
2929
import org.apache.beam.sdk.values.WindowedValue;
30+
import org.checkerframework.checker.nullness.qual.Nullable;
3031
import org.joda.time.Instant;
3132

3233
/**
@@ -105,6 +106,9 @@ public void finishBundle() {
105106
container.updateMetrics(stepName);
106107
}
107108

109+
@Override
110+
public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
111+
108112
@Override
109113
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
110114
delegate.onWindowExpiration(window, timestamp, key);

runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,9 @@ public void finishBundle() {
10641064
}
10651065
}
10661066

1067+
@Override
1068+
public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
1069+
10671070
@Override
10681071
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}
10691072

runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,9 @@ public void finishBundle() {
255255
Optional.ofNullable(finishBundleCallback).ifPresent(Runnable::run);
256256
}
257257

258+
@Override
259+
public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
260+
258261
@Override
259262
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}
260263

runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,9 @@ public void finishBundle() {
523523
wrappedRunner.finishBundle();
524524
}
525525

526+
@Override
527+
public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
528+
526529
@Override
527530
public <KeyT> void onWindowExpiration(
528531
BoundedWindow window, Instant timestamp, KeyT key) {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ public void processTimers() throws Exception {
119119
// Nothing.
120120
}
121121

122+
@Override
123+
public void finishKey(Object key) throws Exception {}
124+
122125
@Override
123126
public void finishBundle() throws Exception {
124127
receiver = null;

0 commit comments

Comments
 (0)