Skip to content

Commit 01e554f

Browse files
committed
Fix build
1 parent 08a7f7d commit 01e554f

4 files changed

Lines changed: 24 additions & 0 deletions

File tree

runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,16 @@ public void finishBundle() {
105105
container.updateMetrics(stepName);
106106
}
107107

108+
@Override
109+
public void finishKey() {
110+
try (Closeable ignored =
111+
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) {
112+
delegate.finishKey();
113+
} catch (IOException e) {
114+
throw new RuntimeException(e);
115+
}
116+
}
117+
108118
@Override
109119
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
110120
delegate.onWindowExpiration(window, timestamp, key);

runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,11 @@ public void finishBundle() {
10641064
}
10651065
}
10661066

1067+
@Override
1068+
public void finishKey() {
1069+
// Do nothing by default.
1070+
}
1071+
10671072
@Override
10681073
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}
10691074

runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,11 @@ public void finishBundle() {
255255
Optional.ofNullable(finishBundleCallback).ifPresent(Runnable::run);
256256
}
257257

258+
@Override
259+
public void finishKey() {
260+
// Do nothing by default.
261+
}
262+
258263
@Override
259264
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}
260265

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ public void testSideInputNotReady() throws Exception {
150150

151151
runner.startBundle();
152152
runner.processElement(createDatum("e", 0));
153+
runner.finishKey();
153154
runner.finishBundle();
154155

155156
assertTrue(outputManager.getOutput(mainOutputTag).isEmpty());
@@ -214,6 +215,7 @@ public void testMultipleWindowsNotReady() throws Exception {
214215

215216
runner.startBundle();
216217
runner.processElement(elem);
218+
runner.finishKey();
217219
runner.finishBundle();
218220

219221
assertTrue(outputManager.getOutput(mainOutputTag).isEmpty());
@@ -317,6 +319,7 @@ public void testSideInputNotification() throws Exception {
317319
when(mockSideInputReader.get(eq(view), any(BoundedWindow.class))).thenReturn("data");
318320

319321
runner.startBundle();
322+
runner.finishKey();
320323
runner.finishBundle();
321324

322325
assertThat(outputManager.getOutput(mainOutputTag), contains(createDatum("e:data", 0)));
@@ -373,6 +376,7 @@ public void testMultipleSideInputs() throws Exception {
373376

374377
runner.startBundle();
375378
runner.processElement(createDatum("e2", 2));
379+
runner.finishKey();
376380
runner.finishBundle();
377381

378382
assertThat(

0 commit comments

Comments
 (0)