Skip to content

[FLINK-38543] Change the overall UC restore process, JM and task initialization#27862

Open
1996fanrui wants to merge 8 commits intoapache:masterfrom
1996fanrui:38543/change-overall-uc-restore-process-for-checkpointint-during-recovery
Open

[FLINK-38543] Change the overall UC restore process, JM and task initialization#27862
1996fanrui wants to merge 8 commits intoapache:masterfrom
1996fanrui:38543/change-overall-uc-restore-process-for-checkpointint-during-recovery

Conversation

@1996fanrui
Copy link
Copy Markdown
Member

@1996fanrui 1996fanrui commented Mar 31, 2026

This PR depends on #27782, #27783 and #27861

What is the purpose of the change

[FLINK-38543] Change the overall UC restore process, JM and task initialization

Brief change log

  • [FLINK-38543][checkpoint] Fix Mailbox loop interrupted before recovery finished
  • [FLINK-38543][checkpoint] Introduce bufferFilteringCompleteFuture for earlier RUNNING state transition
  • [FLINK-38543][checkpoint] Change overall UC restore process for checkpoint during recovery

Verifying this change

  • Tons of unit tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive):no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector:no

Documentation

  • Does this pull request introduce a new feature? no

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Mar 31, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@1996fanrui 1996fanrui force-pushed the 38543/change-overall-uc-restore-process-for-checkpointint-during-recovery branch 2 times, most recently from fe6cb30 to f7b708b Compare March 31, 2026 17:29
@1996fanrui 1996fanrui force-pushed the 38543/change-overall-uc-restore-process-for-checkpointint-during-recovery branch 2 times, most recently from 907bf3a to 7f75e13 Compare March 31, 2026 19:15
…n if it is blocked to ensure the checkpoint barrier can be handled by downstream task

Priority events (e.g. unaligned checkpoint barriers) must notify downstream even
when the subpartition is blocked.

During recovery, once the upstream output channel state is fully restored, a
RECOVERY_COMPLETION event (EndOfOutputChannelStateEvent) is emitted. This event
blocks the subpartition to prevent the upstream from sending new data while the
downstream is still consuming recovered buffers. The subpartition remains blocked
until the downstream finishes consuming all recovered buffers from every channel
and calls resumeConsumption() to unblock.

If a checkpoint is triggered while the downstream is still consuming recovered
buffers, the upstream receives an unaligned checkpoint barrier and adds it to this
blocked subpartition. The barrier must still be delivered to the downstream
immediately, otherwise the checkpoint will hang until it times out.
@1996fanrui 1996fanrui force-pushed the 38543/change-overall-uc-restore-process-for-checkpointint-during-recovery branch from 7f75e13 to 3dcb525 Compare April 2, 2026 20:53
Comment on lines +916 to +923
// Return allOf result instead of thenRun result.
// thenRun returns a NEW future that completes after the callback finishes.
// Since suspend() runs on the async thread and just sends a poison mail,
// the mailbox loop can exit before suspend() returns, causing isDone() to be false.
CompletableFuture<Void> allRecoveredFuture =
CompletableFuture.allOf(recoveredFutures.toArray(new CompletableFuture[0]));
allRecoveredFuture.thenRun(mailboxProcessor::suspend);
return allRecoveredFuture;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test coverage for this issue.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, isn't this worthy of a separate bug fix? Can not this cause some critical problems?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This race condition only manifests with the new checkpointingDuringRecovery logic because bufferFilteringCompleteFuture completes much earlier (when state is written) than stateConsumedFuture (when state is consumed), creating a wider race window — so no separate JIRA is needed. The race is inherently non-deterministic and cannot be reliably reproduced in a unit test; the existing checkState(allGatesRecoveredFuture.isDone()) in restoreInternal() serves as the runtime assertion.

Comment on lines +122 to +124
if (inputGate.isCheckpointingDuringRecoveryEnabled()) {
Preconditions.checkState(
bufferFilteringCompleteFuture.isDone(), "buffer filtering is not complete");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better invariant to always checkState this, regarldess if isCheckpointingDuringRecoveryEnabled is enabled or not and that bufferFilteringCompleteFuture should be always completed.

Otherwise we might have an incosistent state where bufferFilteringCompleteFuture.isDone() == false while stateConsumedFuture.isDone() == true, which doesn't make sense.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* future completes before {@link #getStateConsumedFuture()}, enabling earlier RUNNING state
* transition when unaligned checkpoint during recovery is enabled.
*/
public abstract CompletableFuture<Void> getBufferFilteringCompleteFuture();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit lacks a test coverage. You should add some a single simple test to for example for UnionInputGate - that would indirectly test for RecoveredInputChannel and SingleInputGate

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added UnionInputGateTest.testBufferFilteringCompleteFutureAggregation

@1996fanrui 1996fanrui force-pushed the 38543/change-overall-uc-restore-process-for-checkpointint-during-recovery branch from 3dcb525 to e9d3b49 Compare April 3, 2026 13:58
…y finished

Return allOf result instead of thenRun result.
thenRun returns a NEW future that completes after the callback finishes.
Since suspend() runs on the async thread and just sends a poison mail,
the mailbox loop can exit before suspend() returns, causing isDone() to be false.
This race is practically only triggered when checkpointingDuringRecovery is enabled,
because bufferFilteringCompleteFuture completes much earlier (when state is written)
than stateConsumedFuture (when state is consumed), creating a wider race window.
@1996fanrui 1996fanrui force-pushed the 38543/change-overall-uc-restore-process-for-checkpointint-during-recovery branch from e9d3b49 to 355c85d Compare April 3, 2026 16:51
Copy link
Copy Markdown
Member Author

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pnowojski for the review, all comments are addressed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants