Skip to content

Commit 7a5c982

Browse files
committed
review feedback - remove duplicated code and unused variables
1 parent 80c7a49 commit 7a5c982

4 files changed

Lines changed: 5 additions & 27 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -479,12 +479,7 @@ public <T> void outputWindowedValue(
479479

480480
@Override
481481
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
482-
noteOutput();
483-
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
484-
((TimestampObservingWatermarkEstimator) watermarkEstimator)
485-
.observeTimestamp(windowedValue.getTimestamp());
486-
}
487-
outputReceiver.output(mainOutputTag, windowedValue);
482+
outputWindowedValue(mainOutputTag, windowedValue);
488483
}
489484

490485
@Override

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -445,8 +445,7 @@ public <T> void output(TupleTag<T> tag, T output) {
445445

446446
@Override
447447
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
448-
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
449-
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
448+
outputWindowedValue(mainOutputTag, windowedValue);
450449
}
451450

452451
@Override
@@ -1041,8 +1040,7 @@ public <T> void outputWindowedValue(
10411040

10421041
@Override
10431042
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1044-
checkTimestamp(timestamp(), windowedValue.getTimestamp());
1045-
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
1043+
outputWindowedValue(mainOutputTag, windowedValue);
10461044
}
10471045

10481046
@Override
@@ -1312,8 +1310,7 @@ public <T> void outputWindowedValue(
13121310

13131311
@Override
13141312
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1315-
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
1316-
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
1313+
outputWindowedValue(mainOutputTag, windowedValue);
13171314
}
13181315

13191316
@Override

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -651,18 +651,7 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
651651

652652
@Override
653653
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
654-
for (BoundedWindow w : windowedValue.getWindows()) {
655-
getMutableOutput(mainOutputTag)
656-
.add(
657-
ValueInSingleWindow.of(
658-
windowedValue.getValue(),
659-
windowedValue.getTimestamp(),
660-
w,
661-
windowedValue.getPaneInfo(),
662-
windowedValue.getRecordId(),
663-
windowedValue.getRecordOffset(),
664-
windowedValue.causedByDrain()));
665-
}
654+
outputWindowedValue(mainOutputTag, windowedValue);
666655
}
667656

668657
@Override

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@
3030
import org.junit.runners.JUnit4;
3131

3232
public class MetadataPropagationTest {
33-
private static final Integer[] EMPTY = new Integer[] {};
34-
private static final Integer[] DATA = new Integer[] {1, 2, 3, 4, 5};
35-
private static final Integer[] REPEATED_DATA = new Integer[] {1, 1, 2, 2, 3, 3, 4, 4, 5, 5};
3633

3734
@RunWith(JUnit4.class)
3835
public static class MiscTest {

0 commit comments

Comments
 (0)