[FLINK-38543] Change the overall UC restore process, JM and task initialization#27862
Conversation
fe6cb30 to
f7b708b
Compare
…de toBeConsumedBuffers
…ot for recovered buffers
907bf3a to
7f75e13
Compare
…r availability for recovered buffers
…n if it is blocked to ensure the checkpoint barrier can be handled by downstream task Priority events (e.g. unaligned checkpoint barriers) must notify downstream even when the subpartition is blocked. During recovery, once the upstream output channel state is fully restored, a RECOVERY_COMPLETION event (EndOfOutputChannelStateEvent) is emitted. This event blocks the subpartition to prevent the upstream from sending new data while the downstream is still consuming recovered buffers. The subpartition remains blocked until the downstream finishes consuming all recovered buffers from every channel and calls resumeConsumption() to unblock. If a checkpoint is triggered while the downstream is still consuming recovered buffers, the upstream receives an unaligned checkpoint barrier and adds it to this blocked subpartition. The barrier must still be delivered to the downstream immediately, otherwise the checkpoint will hang until it times out.
7f75e13 to
3dcb525
Compare
… physical channels
| // Return allOf result instead of thenRun result. | ||
| // thenRun returns a NEW future that completes after the callback finishes. | ||
| // Since suspend() runs on the async thread and just sends a poison mail, | ||
| // the mailbox loop can exit before suspend() returns, causing isDone() to be false. | ||
| CompletableFuture<Void> allRecoveredFuture = | ||
| CompletableFuture.allOf(recoveredFutures.toArray(new CompletableFuture[0])); | ||
| allRecoveredFuture.thenRun(mailboxProcessor::suspend); | ||
| return allRecoveredFuture; |
There was a problem hiding this comment.
Please add a test coverage for this issue.
There was a problem hiding this comment.
Also, isn't this worthy of a separate bug fix? Can not this cause some critical problems?
There was a problem hiding this comment.
This race condition only manifests with the new checkpointingDuringRecovery logic because bufferFilteringCompleteFuture completes much earlier (when state is written) than stateConsumedFuture (when state is consumed), creating a wider race window — so no separate JIRA is needed. The race is inherently non-deterministic and cannot be reliably reproduced in a unit test; the existing checkState(allGatesRecoveredFuture.isDone()) in restoreInternal() serves as the runtime assertion.
| if (inputGate.isCheckpointingDuringRecoveryEnabled()) { | ||
| Preconditions.checkState( | ||
| bufferFilteringCompleteFuture.isDone(), "buffer filtering is not complete"); |
There was a problem hiding this comment.
I think it would be better invariant to always checkState this, regarldess if isCheckpointingDuringRecoveryEnabled is enabled or not and that bufferFilteringCompleteFuture should be always completed.
Otherwise we might have an incosistent state where bufferFilteringCompleteFuture.isDone() == false while stateConsumedFuture.isDone() == true, which doesn't make sense.
| * future completes before {@link #getStateConsumedFuture()}, enabling earlier RUNNING state | ||
| * transition when unaligned checkpoint during recovery is enabled. | ||
| */ | ||
| public abstract CompletableFuture<Void> getBufferFilteringCompleteFuture(); |
There was a problem hiding this comment.
This commit lacks a test coverage. You should add some a single simple test to for example for UnionInputGate - that would indirectly test for RecoveredInputChannel and SingleInputGate
There was a problem hiding this comment.
Added UnionInputGateTest.testBufferFilteringCompleteFutureAggregation
3dcb525 to
e9d3b49
Compare
…y finished Return allOf result instead of thenRun result. thenRun returns a NEW future that completes after the callback finishes. Since suspend() runs on the async thread and just sends a poison mail, the mailbox loop can exit before suspend() returns, causing isDone() to be false. This race is practically only triggered when checkpointingDuringRecovery is enabled, because bufferFilteringCompleteFuture completes much earlier (when state is written) than stateConsumedFuture (when state is consumed), creating a wider race window.
… earlier RUNNING state transition
…point during recovery
e9d3b49 to
355c85d
Compare
1996fanrui
left a comment
There was a problem hiding this comment.
Thanks @pnowojski for the review, all comments are addressed.
This PR depends on #27782, #27783 and #27861
What is the purpose of the change
[FLINK-38543] Change the overall UC restore process, JM and task initialization
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation