Skip to content

Commit d5ef9e5

Browse files
committed
Fix tests
1 parent 9e83f33 commit d5ef9e5

1 file changed

Lines changed: 12 additions & 8 deletions

File tree

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,6 @@ public void testReadUnboundedReader() throws Exception {
680680
numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements()));
681681

682682
// Extract and verify state modifications.
683-
context.finishKey();
684683
context.flushState();
685684
state = context.getOutputBuilder().getSourceStateUpdates().getState();
686685
// CountingSource's watermark is the last record + 1. i is now one past the last record,
@@ -1042,14 +1041,19 @@ public void testFailedWorkItemsAbort() throws Exception {
10421041
NativeReaderIterator<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> readerIterator =
10431042
reader.iterator();
10441043
int numReads = 0;
1045-
while ((numReads == 0) ? readerIterator.start() : readerIterator.advance()) {
1046-
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>> value = readerIterator.getCurrent();
1047-
assertEquals(KV.of(0, numReads), value.getValue().getValue());
1048-
numReads++;
1049-
// Fail the work item after reading two elements.
1050-
if (numReads == 2) {
1051-
dummyWork.setFailed();
1044+
try {
1045+
while ((numReads == 0) ? readerIterator.start() : readerIterator.advance()) {
1046+
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>> value = readerIterator.getCurrent();
1047+
assertEquals(KV.of(0, numReads), value.getValue().getValue());
1048+
numReads++;
1049+
// Fail the work item after reading two elements.
1050+
if (numReads == 2) {
1051+
dummyWork.setFailed();
1052+
}
10521053
}
1054+
fail("Expected WorkItemCancelledException");
1055+
} catch (WorkItemCancelledException e) {
1056+
// Expected
10531057
}
10541058
assertThat(numReads, equalTo(2));
10551059
}

0 commit comments

Comments
 (0)