Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 02acad4

Browse files
jkfflukecwik
authored andcommitted
Fixes a bug in custom unbounded readers
Custom unbounded readers are read in bundles of at most 10k elements or 10 seconds. A recent change accidentally removed the 10k element limit. This change reintroduces it and adds a test. The previous test also was passing vacuously because the iteration limit was incorrect (it would always have only one iteration). ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=112723469
1 parent 2713af1 commit 02acad4

2 files changed

Lines changed: 23 additions & 9 deletions

File tree

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -546,22 +546,24 @@ public double getRemainingParallelism() {
546546
}
547547
}
548548

549+
// Commit at least once every 10 seconds or 10k records. This keeps the watermark advancing
550+
// smoothly, and ensures that not too much work will have to be reprocessed in the event of
551+
// a crash.
552+
@VisibleForTesting
553+
static final int MAX_UNBOUNDED_BUNDLE_SIZE = 10000;
554+
@VisibleForTesting
555+
static final Duration MAX_UNBOUNDED_BUNDLE_READ_TIME = Duration.standardSeconds(10);
556+
549557
private static class UnboundedReaderIterator<T>
550558
extends NativeReader.NativeReaderIterator<WindowedValue<ValueWithRecordId<T>>> {
551-
// Commit at least once every 10 seconds or 10k records. This keeps the watermark advancing
552-
// smoothly, and ensures that not too much work will have to be reprocessed in the event of
553-
// a crash.
554-
private static final int MAX_BUNDLE_SIZE = 10000;
555-
private static final Duration MAX_BUNDLE_READ_TIME = Duration.standardSeconds(10);
556-
557559
private final UnboundedSource.UnboundedReader<T> reader;
558560
private final boolean started;
559561
private final Instant endTime;
560562
private int elemsRead;
561563

562564
private UnboundedReaderIterator(UnboundedSource.UnboundedReader<T> reader, boolean started) {
563565
this.reader = reader;
564-
this.endTime = Instant.now().plus(MAX_BUNDLE_READ_TIME);
566+
this.endTime = Instant.now().plus(MAX_UNBOUNDED_BUNDLE_READ_TIME);
565567
this.elemsRead = 0;
566568
this.started = started;
567569
}
@@ -588,7 +590,7 @@ public boolean start() throws IOException {
588590

589591
@Override
590592
public boolean advance() throws IOException {
591-
if (elemsRead >= MAX_BUNDLE_SIZE
593+
if (elemsRead >= MAX_UNBOUNDED_BUNDLE_SIZE
592594
|| Instant.now().isAfter(endTime)) {
593595
return false;
594596
}
@@ -598,6 +600,7 @@ public boolean advance() throws IOException {
598600
while (true) {
599601
try {
600602
if (reader.advance()) {
603+
elemsRead++;
601604
return true;
602605
}
603606
} catch (Exception e) {

sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static org.hamcrest.Matchers.allOf;
3333
import static org.hamcrest.Matchers.contains;
3434
import static org.hamcrest.Matchers.containsString;
35+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3536
import static org.junit.Assert.assertArrayEquals;
3637
import static org.junit.Assert.assertEquals;
3738
import static org.junit.Assert.assertFalse;
@@ -632,7 +633,8 @@ public void testReadUnboundedReader() throws Exception {
632633
options.setNumWorkers(5);
633634

634635
ByteString state = ByteString.EMPTY;
635-
for (int i = 0; i < 100; /* Incremented in inner loop */) {
636+
for (int i = 0; i < 10 * CustomSources.MAX_UNBOUNDED_BUNDLE_SIZE;
637+
/* Incremented in inner loop */) {
636638
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>> value;
637639

638640
// Initialize streaming context with state from previous iteration.
@@ -664,6 +666,8 @@ public void testReadUnboundedReader() throws Exception {
664666
iterator = reader.iterator();
665667

666668
// Verify data.
669+
Instant beforeReading = Instant.now();
670+
int numReadOnThisIteration = 0;
667671
for (boolean more = iterator.start(); more; more = iterator.advance()) {
668672
value = iterator.getCurrent();
669673
assertEquals(KV.of(0, i), value.getValue().getValue());
@@ -673,7 +677,14 @@ public void testReadUnboundedReader() throws Exception {
673677
assertThat(value.getWindows(), contains((BoundedWindow) GlobalWindow.INSTANCE));
674678
assertEquals(i, value.getTimestamp().getMillis());
675679
i++;
680+
numReadOnThisIteration++;
676681
}
682+
Instant afterReading = Instant.now();
683+
assertThat(
684+
new Duration(beforeReading, afterReading).getStandardSeconds(),
685+
lessThanOrEqualTo(CustomSources.MAX_UNBOUNDED_BUNDLE_READ_TIME.getStandardSeconds() + 1));
686+
assertThat(
687+
numReadOnThisIteration, lessThanOrEqualTo(CustomSources.MAX_UNBOUNDED_BUNDLE_SIZE));
677688

678689
// Extract and verify state modifications.
679690
context.flushState();

0 commit comments

Comments
 (0)