Skip to content

Commit 578cb09

Browse files
committed
fix npe in ReduceRunner.
1 parent a05482e commit 578cb09

3 files changed

Lines changed: 9 additions & 2 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ public void processElements(Iterable<WindowedValue<InputT>> values) throws Excep
376376
emit(
377377
contextFactory.base(window, StateStyle.DIRECT),
378378
contextFactory.base(window, StateStyle.RENAMED),
379-
null);
379+
CausedByDrain.NORMAL);
380380
}
381381

382382
// We're all done with merging and emitting elements so can compress the activeWindow state.

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,11 +445,13 @@ public <T> void output(TupleTag<T> tag, T output) {
445445

446446
@Override
447447
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
448+
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
448449
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
449450
}
450451

451452
@Override
452453
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
454+
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
453455
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
454456
}
455457

@@ -1039,11 +1041,13 @@ public <T> void outputWindowedValue(
10391041

10401042
@Override
10411043
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1044+
checkTimestamp(timestamp(), windowedValue.getTimestamp());
10421045
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
10431046
}
10441047

10451048
@Override
10461049
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
1050+
checkTimestamp(timestamp(), windowedValue.getTimestamp());
10471051
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
10481052
}
10491053

@@ -1308,11 +1312,13 @@ public <T> void outputWindowedValue(
13081312

13091313
@Override
13101314
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1315+
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
13111316
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
13121317
}
13131318

13141319
@Override
13151320
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
1321+
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
13161322
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
13171323
}
13181324

sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ public static <T> WindowedValue<T> of(
272272
CausedByDrain causedByDrain) {
273273
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");
274274
checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none");
275-
275+
checkArgument(causedByDrain != null, "WindowedValue requires CausedByDrain, but it was null");
276276
if (windows.size() == 1) {
277277
return of(
278278
value,
@@ -322,6 +322,7 @@ public static <T> WindowedValue<T> of(
322322
@Nullable Long currentRecordOffset,
323323
CausedByDrain causedByDrain) {
324324
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");
325+
checkArgument(causedByDrain != null, "WindowedValue requires CausedByDrain, but it was null");
325326

326327
boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
327328
if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {

0 commit comments

Comments
 (0)