Skip to content

Commit d4c092e

Browse files
committed
[FLINK-38543][checkpoint] Introduce bufferFilteringCompleteFuture for earlier RUNNING state transition
1 parent 834fefd commit d4c092e

8 files changed

Lines changed: 83 additions & 2 deletions

File tree

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,5 +192,12 @@ public String toString() {
192192

193193
public abstract CompletableFuture<Void> getStateConsumedFuture();
194194

195+
/**
196+
* Returns a future that completes when buffer filtering is complete for all channels. This
197+
* future completes before {@link #getStateConsumedFuture()}, enabling earlier RUNNING state
198+
* transition when unaligned checkpoint during recovery is enabled.
199+
*/
200+
public abstract CompletableFuture<Void> getBufferFilteringCompleteFuture();
201+
195202
public abstract void finishReadRecoveredState() throws IOException;
196203
}

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ public abstract class RecoveredInputChannel extends InputChannel implements Chan
6262
private final CompletableFuture<?> stateConsumedFuture = new CompletableFuture<>();
6363
protected final BufferManager bufferManager;
6464

65+
/**
66+
* Future that completes when recovered buffers have been filtered for this channel. This
67+
* completes before stateConsumedFuture, enabling earlier RUNNING state transition when
68+
* unaligned checkpoint during recovery is enabled.
69+
*/
70+
private final CompletableFuture<Void> bufferFilteringCompleteFuture = new CompletableFuture<>();
71+
6572
@GuardedBy("receivedBuffers")
6673
private boolean isReleased;
6774

@@ -109,8 +116,16 @@ public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
109116
}
110117

111118
public final InputChannel toInputChannel() throws IOException {
112-
Preconditions.checkState(
113-
stateConsumedFuture.isDone(), "recovered state is not fully consumed");
119+
// Check the appropriate future based on configuration:
120+
// - When unaligned during recovery is enabled: check bufferFilteringCompleteFuture
121+
// - When disabled: check stateConsumedFuture (original behavior)
122+
if (inputGate.isCheckpointingDuringRecoveryEnabled()) {
123+
Preconditions.checkState(
124+
bufferFilteringCompleteFuture.isDone(), "buffer filtering is not complete");
125+
} else {
126+
Preconditions.checkState(
127+
stateConsumedFuture.isDone(), "recovered state is not fully consumed");
128+
}
114129

115130
// Extract remaining buffers before conversion.
116131
// These buffers have been filtered but not yet consumed by the Task.
@@ -140,6 +155,14 @@ public void checkpointStopped(long checkpointId) {
140155
protected abstract InputChannel toInputChannelInternal(ArrayDeque<Buffer> remainingBuffers)
141156
throws IOException;
142157

158+
/**
159+
* Returns the future that completes when buffer filtering is complete. This future completes
160+
* before stateConsumedFuture, at the point when finishReadRecoveredState() is called.
161+
*/
162+
CompletableFuture<Void> getBufferFilteringCompleteFuture() {
163+
return bufferFilteringCompleteFuture;
164+
}
165+
143166
CompletableFuture<?> getStateConsumedFuture() {
144167
return stateConsumedFuture;
145168
}
@@ -180,6 +203,13 @@ public void finishReadRecoveredState() throws IOException {
180203
EventSerializer.toBuffer(EndOfInputChannelStateEvent.INSTANCE, false));
181204
bufferManager.releaseFloatingBuffers();
182205
LOG.debug("{}/{} finished recovering input.", inputGate.getOwningTaskName(), channelInfo);
206+
207+
// Complete bufferFilteringCompleteFuture only when unaligned during recovery is enabled.
208+
// This signals that buffer filtering is complete, allowing earlier RUNNING state
209+
// transition. When the config is disabled, this future should not be completed.
210+
if (inputGate.isCheckpointingDuringRecoveryEnabled()) {
211+
bufferFilteringCompleteFuture.complete(null);
212+
}
183213
}
184214

185215
@Nullable

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,21 @@ public boolean isCheckpointingDuringRecoveryEnabled() {
341341
return checkpointingDuringRecoveryEnabled;
342342
}
343343

344+
@Override
345+
public CompletableFuture<Void> getBufferFilteringCompleteFuture() {
346+
synchronized (requestLock) {
347+
List<CompletableFuture<?>> futures = new ArrayList<>(numberOfInputChannels);
348+
for (InputChannel inputChannel : inputChannels()) {
349+
if (inputChannel instanceof RecoveredInputChannel) {
350+
futures.add(
351+
((RecoveredInputChannel) inputChannel)
352+
.getBufferFilteringCompleteFuture());
353+
}
354+
}
355+
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
356+
}
357+
}
358+
344359
@Override
345360
public void requestPartitions() {
346361
synchronized (requestLock) {

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,15 @@ public CompletableFuture<Void> getStateConsumedFuture() {
350350
.toArray(new CompletableFuture[] {}));
351351
}
352352

353+
@Override
354+
public CompletableFuture<Void> getBufferFilteringCompleteFuture() {
355+
return CompletableFuture.allOf(
356+
inputGatesByGateIndex.values().stream()
357+
.map(InputGate::getBufferFilteringCompleteFuture)
358+
.collect(Collectors.toList())
359+
.toArray(new CompletableFuture[] {}));
360+
}
361+
353362
@Override
354363
public void requestPartitions() throws IOException {
355364
for (InputGate inputGate : inputGatesByGateIndex.values()) {

flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ public CompletableFuture<Void> getStateConsumedFuture() {
120120
return inputGate.getStateConsumedFuture();
121121
}
122122

123+
@Override
124+
public CompletableFuture<Void> getBufferFilteringCompleteFuture() {
125+
return inputGate.getBufferFilteringCompleteFuture();
126+
}
127+
123128
@Override
124129
public void requestPartitions() throws IOException {
125130
inputGate.requestPartitions();

flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ public CompletableFuture<Void> getStateConsumedFuture() {
5757
return CompletableFuture.completedFuture(null);
5858
}
5959

60+
@Override
61+
public CompletableFuture<Void> getBufferFilteringCompleteFuture() {
62+
return CompletableFuture.completedFuture(null);
63+
}
64+
6065
@Override
6166
public void finishReadRecoveredState() {}
6267

flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ public CompletableFuture<Void> getStateConsumedFuture() {
8080
return CompletableFuture.completedFuture(null);
8181
}
8282

83+
@Override
84+
public CompletableFuture<Void> getBufferFilteringCompleteFuture() {
85+
return CompletableFuture.completedFuture(null);
86+
}
87+
8388
@Override
8489
public void finishReadRecoveredState() {}
8590

flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,11 @@ public CompletableFuture<Void> getStateConsumedFuture() {
263263
return CompletableFuture.completedFuture(null);
264264
}
265265

266+
@Override
267+
public CompletableFuture<Void> getBufferFilteringCompleteFuture() {
268+
return CompletableFuture.completedFuture(null);
269+
}
270+
266271
@Override
267272
public void finishReadRecoveredState() {}
268273

0 commit comments

Comments
 (0)