3131import org .apache .paimon .table .source .TableScan ;
3232import org .apache .paimon .utils .BlockingIterator ;
3333
34- import org .apache .flink .api .common .JobID ;
3534import org .apache .flink .configuration .CheckpointingOptions ;
3635import org .apache .flink .configuration .ExternalizedCheckpointRetention ;
3736import org .apache .flink .core .execution .JobClient ;
38- import org .apache .flink .runtime .execution .ExecutionState ;
39- import org .apache .flink .runtime .minicluster .MiniCluster ;
4037import org .apache .flink .streaming .api .datastream .DataStream ;
4138import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
4239import org .apache .flink .table .api .DataTypes ;
4744import org .junit .jupiter .api .Test ;
4845import org .junit .jupiter .api .Timeout ;
4946
50- import java .lang .reflect .Field ;
5147import java .util .ArrayList ;
5248import java .util .Arrays ;
5349import java .util .List ;
5450import java .util .concurrent .TimeUnit ;
55- import java .util .concurrent .atomic .AtomicBoolean ;
5651import java .util .stream .Collectors ;
5752
5853import static org .assertj .core .api .Assertions .assertThat ;
@@ -781,37 +776,6 @@ private void waitForRowCount(String tableName, int minRows) throws Exception {
781776 "Timed out waiting for " + minRows + " rows in " + tableName + ", got " + count );
782777 }
783778
784- /** Polls until all tasks of the given job are in RUNNING state. */
785- private void waitForJobRunning (JobClient jobClient ) throws Exception {
786- Field field = jobClient .getClass ().getDeclaredField ("miniCluster" );
787- field .setAccessible (true );
788- MiniCluster miniCluster = (MiniCluster ) field .get (jobClient );
789- JobID jobID = jobClient .getJobID ();
790-
791- long deadline = System .currentTimeMillis () + 60_000 ;
792- while (System .currentTimeMillis () < deadline ) {
793- AtomicBoolean allRunning = new AtomicBoolean (true );
794- miniCluster
795- .getExecutionGraph (jobID )
796- .thenAccept (
797- eg ->
798- eg .getAllExecutionVertices ()
799- .forEach (
800- v -> {
801- if (v .getExecutionState ()
802- != ExecutionState .RUNNING ) {
803- allRunning .set (false );
804- }
805- }))
806- .get ();
807- if (allRunning .get ()) {
808- return ;
809- }
810- Thread .sleep (1000 );
811- }
812- throw new AssertionError ("Timed out waiting for job " + jobID + " to reach RUNNING state" );
813- }
814-
815779 /**
816780 * Tests the streaming read lifecycle for a chain table with changelog-producer=input.
817781 *
@@ -1051,9 +1015,6 @@ public void testStreamingReadChainTableStatefulRestart() throws Exception {
10511015 //noinspection OptionalGetWithoutIsPresent
10521016 JobClient jobClient = tableResult .getJobClient ().get ();
10531017
1054- // Wait for streaming job to be fully running before writing Phase 2 data.
1055- waitForJobRunning (jobClient );
1056-
10571018 // === Phase 2: Write incremental delta, let Phase 2 consume it, THEN checkpoint ===
10581019 // This exercises the checkpoint() regression: if checkpoint() returns the stale
10591020 // Phase 1 boundary instead of the advanced delta cursor, restore would re-read
@@ -1067,16 +1028,18 @@ public void testStreamingReadChainTableStatefulRestart() throws Exception {
10671028 // advanced cursor — the exact scenario the regression would break.
10681029 waitForRowCount ("chain_restart_sink" , 7 );
10691030
1070- // Create a savepoint for restart. The enumerator state now includes the advanced
1071- // delta cursor (past delta@20250809).
1072- String checkpointPath = triggerCheckpoint (jobClient );
1073-
1074- java .io .File cpFile =
1075- new java .io .File (checkpointPath .replace ("file:/" , "/" ).replace ("file://" , "/" ));
1076- if (!cpFile .exists ()) {
1077- cpFile = cpFile .getParentFile ();
1078- }
1079- assertThat (cpFile .exists ()).as ("Checkpoint directory should exist: " + cpFile ).isTrue ();
1031+ // Create a savepoint and stop the job atomically.
1032+ // stopWithSavepoint guarantees the savepoint is fully committed to disk
1033+ // before returning.
1034+ String savepointDir = path + "/savepoints" ;
1035+ new java .io .File (savepointDir ).mkdirs ();
1036+ String checkpointPath =
1037+ jobClient
1038+ .stopWithSavepoint (
1039+ false ,
1040+ savepointDir ,
1041+ org .apache .flink .core .execution .SavepointFormatType .CANONICAL )
1042+ .get ();
10801043
10811044 // Verify Phase 1+2 data (committed by checkpoint).
10821045 List <String > phase1and2 =
@@ -1094,20 +1057,14 @@ public void testStreamingReadChainTableStatefulRestart() throws Exception {
10941057 "+I[4, 1, new_4, 20250809]" ,
10951058 "+I[5, 1, new_5, 20250809]" );
10961059
1097- jobClient .cancel ().get ();
1098-
1099- // Disable auto-checkpointing before restart to avoid conflicts between
1100- // auto-checkpoints and manual triggerCheckpoint() on the new job.
1101- config .removeKey ("execution.checkpointing.interval" );
1102-
11031060 // === Phase 3: Write new delta data while job is stopped ===
11041061 sql (
11051062 "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250810')"
11061063 + " VALUES (6, 1, 'new_6'), (7, 1, 'new_7')" );
11071064
1108- // === Phase 4: Restart from checkpoint ===
1065+ // === Phase 4: Restart from savepoint ===
11091066 // The restored scan should NOT re-read delta@20250809 (already consumed before
1110- // checkpoint ). If checkpoint() returned the stale Phase 1 boundary, delta@20250809
1067+ // savepoint ). If checkpoint() returned the stale Phase 1 boundary, delta@20250809
11111068 // would be re-read and produce duplicates.
11121069 sEnv .getConfig ()
11131070 .getConfiguration ()
@@ -1117,25 +1074,19 @@ public void testStreamingReadChainTableStatefulRestart() throws Exception {
11171074 //noinspection OptionalGetWithoutIsPresent
11181075 JobClient jobClient2 = tableResult2 .getJobClient ().get ();
11191076
1120- // Trigger checkpoint to commit Phase 4 data (restored scan output + Phase 3 delta).
1121- waitForJobRunning (jobClient2 );
1122- triggerCheckpoint (jobClient2 );
1123-
1124- // Poll sink until checkpoint commits all 9 rows (7 from Phase 1+2 + 2 from Phase 3).
1077+ // Auto-checkpointing (1s interval) is still enabled, so data is committed
1078+ // to the sink automatically. waitForRowCount polls until data appears.
11251079 waitForRowCount ("chain_restart_sink" , 9 );
11261080
1127- // Read all records from the sink table
11281081 List <String > phase4 =
11291082 sql ("SELECT * FROM chain_restart_sink" ).stream ()
11301083 .map (Row ::toString )
11311084 .collect (Collectors .toList ());
11321085
1133- // Verify new data is present
11341086 assertThat (phase4 )
11351087 .as ("Stateful restart: sink should contain new delta data" )
11361088 .contains ("+I[6, 1, new_6, 20250810]" , "+I[7, 1, new_7, 20250810]" );
11371089
1138- // Verify total count: 7 from Phase 1+2 + 2 from Phase 3 = 9, no duplicates
11391090 assertThat (phase4 .size ())
11401091 .as ("Should have exactly 9 records (no duplicates from state recovery)" )
11411092 .isEqualTo (9 );
@@ -1145,9 +1096,6 @@ public void testStreamingReadChainTableStatefulRestart() throws Exception {
11451096 "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250811')"
11461097 + " VALUES (8, 1, 'new_8')" );
11471098
1148- triggerCheckpoint (jobClient2 );
1149-
1150- // Poll sink until checkpoint commits the new incremental row.
11511099 waitForRowCount ("chain_restart_sink" , 10 );
11521100
11531101 List <String > phase5 =
@@ -1461,39 +1409,6 @@ public void testStreamingReadWithGroupPartition() throws Exception {
14611409 it .close ();
14621410 }
14631411
1464- /**
1465- * Triggers a checkpoint on the given job and returns the checkpoint path. Waits until all tasks
1466- * are RUNNING before triggering.
1467- */
1468- private String triggerCheckpoint (JobClient jobClient ) throws Exception {
1469- Field field = jobClient .getClass ().getDeclaredField ("miniCluster" );
1470- field .setAccessible (true );
1471- MiniCluster miniCluster = (MiniCluster ) field .get (jobClient );
1472- JobID jobID = jobClient .getJobID ();
1473-
1474- // Wait for all tasks to be RUNNING
1475- AtomicBoolean allRunning = new AtomicBoolean (false );
1476- while (!allRunning .get ()) {
1477- allRunning .set (true );
1478- Thread .sleep (1000 );
1479- miniCluster
1480- .getExecutionGraph (jobID )
1481- .thenAccept (
1482- eg ->
1483- eg .getAllExecutionVertices ()
1484- .forEach (
1485- v -> {
1486- if (v .getExecutionState ()
1487- != ExecutionState .RUNNING ) {
1488- allRunning .set (false );
1489- }
1490- }))
1491- .get ();
1492- }
1493-
1494- return miniCluster .triggerCheckpoint (jobID ).get ();
1495- }
1496-
14971412 // =========================================================================
14981413 // Additional coverage tests
14991414 // =========================================================================
0 commit comments