5656import java .util .NavigableMap ;
5757import java .util .OptionalLong ;
5858import java .util .TreeMap ;
59+ import java .util .concurrent .CompletableFuture ;
5960
6061/**
6162 * This is the single (non-parallel) monitoring task, it is responsible for:
@@ -130,6 +131,7 @@ private class Reader extends AbstractNonCoordinatedSourceReader<Split> {
130131 Long .parseLong (x .split (":" )[0 ]),
131132 Long .parseLong (x .split (":" )[1 ])));
132133 private final TreeMap <Long , Long > nextSnapshotPerCheckpoint = new TreeMap <>();
134+ private CompletableFuture <Void > availableFuture = CompletableFuture .completedFuture (null );
133135
134136 @ Override
135137 public void notifyCheckpointComplete (long checkpointId ) {
@@ -185,6 +187,11 @@ public void addSplits(List<SimpleSourceSplit> list) {
185187 }
186188 }
187189
190+ @ Override
191+ public CompletableFuture <Void > isAvailable () {
192+ return availableFuture ;
193+ }
194+
188195 @ Override
189196 public InputStatus pollNext (ReaderOutput <Split > readerOutput ) throws Exception {
190197 boolean isEmpty ;
@@ -209,7 +216,15 @@ public InputStatus pollNext(ReaderOutput<Split> readerOutput) throws Exception {
209216 }
210217
211218 if (isEmpty ) {
212- Thread .sleep (monitorInterval );
219+ availableFuture =
220+ CompletableFuture .runAsync (
221+ () -> {
222+ try {
223+ Thread .sleep (monitorInterval );
224+ } catch (InterruptedException ignored ) {
225+ }
226+ });
227+ return InputStatus .NOTHING_AVAILABLE ;
213228 }
214229 return InputStatus .MORE_AVAILABLE ;
215230 }
0 commit comments