File tree Expand file tree Collapse file tree
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -913,8 +913,14 @@ private CompletableFuture<Void> restoreStateAndGates(
913913 "Input gate request partitions" ));
914914 }
915915
916- return CompletableFuture .allOf (recoveredFutures .toArray (new CompletableFuture [0 ]))
917- .thenRun (mailboxProcessor ::suspend );
916+ // Return allOf result instead of thenRun result.
917+ // thenRun returns a NEW future that completes after the callback finishes.
918+ // Since suspend() runs on the async thread and just sends a poison mail,
919+ // the mailbox loop can exit before suspend() returns, causing isDone() to be false.
920+ CompletableFuture <Void > allRecoveredFuture =
921+ CompletableFuture .allOf (recoveredFutures .toArray (new CompletableFuture [0 ]));
922+ allRecoveredFuture .thenRun (mailboxProcessor ::suspend );
923+ return allRecoveredFuture ;
918924 }
919925
920926 private void ensureNotCanceled () {
You can’t perform that action at this time.
0 commit comments