Skip to content

Commit 08a7f7d

Browse files
committed
Add DoFnRunner::finishKey method.
In upcoming changes there'll be multiple dataflow streaming work items in a single beam bundle. With multiple work items, we've to process elements and timers of each work item before moving to the next work items. The new finishKey method allows the DoFnRunners to cleanup/persist state (that should not be carried over) before switching work items on multi key bundles. Streaming SideInputDoFnRunners are the only classes using the finishKey method right now. The finishKey() is not exposed to DoFns and is not visible in user apis.
1 parent d5ef9e5 commit 08a7f7d

27 files changed

Lines changed: 128 additions & 1 deletion

runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ public interface DoFnRunner<InputT extends @Nullable Object, OutputT extends @Nu
6363
<KeyT extends @Nullable Object> void onWindowExpiration(
6464
BoundedWindow window, Instant timestamp, KeyT key);
6565

66+
/**
67+
* Performs per-key cleanup or processing after all elements and timers for a key have been
68+
* processed.
69+
*/
70+
void finishKey();
71+
6672
/**
6773
* Returns the underlying fn instance.
6874
*

runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ public void finishBundle() {
101101
doFnRunner.finishBundle();
102102
}
103103

104+
@Override
105+
public void finishKey() {
106+
doFnRunner.finishKey();
107+
}
108+
104109
@Override
105110
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
106111
doFnRunner.onWindowExpiration(window, timestamp, key);

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,11 @@ public void finishBundle() {
229229
}
230230
}
231231

232+
@Override
233+
public void finishKey() {
234+
// Do nothing by default.
235+
}
236+
232237
@Override
233238
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
234239
invoker.invokeOnWindowExpiration(

runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ public void finishBundle() {
131131
doFnRunner.finishBundle();
132132
}
133133

134+
@Override
135+
public void finishKey() {
136+
doFnRunner.finishKey();
137+
}
138+
134139
@Override
135140
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
136141
doFnRunner.onWindowExpiration(window, timestamp, key);

runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,9 @@ public void finishBundle() {
379379
finished = true;
380380
}
381381

382+
@Override
383+
public void finishKey() {}
384+
382385
@Override
383386
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}
384387
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ public void processTimers() throws Exception {
119119
// Nothing.
120120
}
121121

122+
@Override
123+
public void finishKey() throws Exception {
124+
// Nothing.
125+
}
126+
122127
@Override
123128
public void finishBundle() throws Exception {
124129
receiver = null;

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ public void processTimers() throws Exception {
7373
// The timers for the underlying ParDoFn are processed at the end of each element
7474
}
7575

76+
@Override
77+
public void finishKey() throws Exception {
78+
// Nothing.
79+
}
80+
7681
@Override
7782
public void finishBundle() throws Exception {
7883
underlyingParDoFn.finishBundle();

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,11 @@ public void processElement(Object untypedElem) throws Exception {
114114
@Override
115115
public void processTimers() {}
116116

117+
@Override
118+
public void finishKey() throws Exception {
119+
// Nothing.
120+
}
121+
117122
@Override
118123
public void finishBundle() {}
119124

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ public void finishBundle() {
128128
simpleRunner.finishBundle();
129129
}
130130

131+
@Override
132+
public void finishKey() {
133+
simpleRunner.finishKey();
134+
}
135+
131136
@Override
132137
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
133138
simpleRunner.onWindowExpiration(window, timestamp, key);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ public void processTimers() throws Exception {
4747
delegate.processTimers();
4848
}
4949

50+
@Override
51+
public void finishKey() throws Exception {
52+
delegate.finishKey();
53+
}
54+
5055
@Override
5156
public void finishBundle() throws Exception {
5257
delegate.finishBundle();

0 commit comments

Comments
 (0)