@@ -351,6 +351,255 @@ void testRestartingJobFromSavepointInSnapshotMode(TarballFetcher.CdcVersion migr
351351 LOG .info ("Snapshot stage finished successfully." );
352352 }
353353
354+ @ Test
355+ void testProjectionChangeAfterSavepointRestart () throws Exception {
356+ runInContainerAsRoot (jobManager , "chmod" , "0777" , "-R" , "/tmp/cdc/" );
357+
358+ // Phase 1: Start with narrow projection (id, name only)
359+ String contentV1 = buildCustomersProjectionPipeline ("id, name" );
360+ JobID jobID = submitPipelineJob (contentV1 );
361+ Assertions .assertThat (jobID ).isNotNull ();
362+ LOG .info ("Submitted Job ID is {} " , jobID );
363+
364+ // Phase 2: Validate snapshot with narrow projection
365+ validateResult (
366+ dbNameFormatter ,
367+ "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink'}, primaryKeys=id, options=()}" ,
368+ "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1], op=INSERT, meta=()}" ,
369+ "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2], op=INSERT, meta=()}" ,
370+ "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3], op=INSERT, meta=()}" ,
371+ "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4], op=INSERT, meta=()}" );
372+ LOG .info ("Snapshot stage finished successfully." );
373+
374+ // Phase 3: Validate incremental with narrow projection
375+ executeMySqlStatements (
376+ mysqlInventoryDatabase ,
377+ "INSERT INTO customers VALUES (105, 'user_5', 'Beijing', '123567891235');" );
378+ validateResult (
379+ dbNameFormatter ,
380+ "DataChangeEvent{tableId=%s.customers, before=[], after=[105, user_5], op=INSERT, meta=()}" );
381+ LOG .info ("Incremental stage 1 finished successfully." );
382+
383+ // Phase 4: Stop with savepoint
384+ String savepointPath = stopJobWithSavepoint (jobID );
385+ LOG .info ("Stopped Job {} and created a savepoint at {}." , jobID , savepointPath );
386+
387+ // Phase 5: Restart with expanded projection (id, name, address)
388+ String contentV2 = buildCustomersProjectionPipeline ("id, name, address" );
389+ JobID newJobID = submitPipelineJob (contentV2 , savepointPath , true );
390+ LOG .info ("Reincarnated Job {} has been submitted successfully." , newJobID );
391+
392+ // Phase 6: Validate schema evolution - AddColumnEvent for address
393+ validateResult (
394+ dbNameFormatter ,
395+ "AddColumnEvent{tableId=%s.customers, addedColumns=[ColumnWithPosition{column=`address` VARCHAR(1024), position=LAST, existedColumnName=null}]}" );
396+ LOG .info ("Schema evolution (AddColumn) validated successfully." );
397+
398+ // Phase 7: Validate new data includes address
399+ executeMySqlStatements (
400+ mysqlInventoryDatabase ,
401+ "INSERT INTO customers VALUES (106, 'user_6', 'Shenzhen', '123567891236');" );
402+ validateResult (
403+ dbNameFormatter ,
404+ "DataChangeEvent{tableId=%s.customers, before=[], after=[106, user_6, Shenzhen], op=INSERT, meta=()}" );
405+ LOG .info ("Incremental stage 2 with expanded projection finished successfully." );
406+
407+ // Cleanup
408+ cancelJob (newJobID );
409+ }
410+
411+ @ Test
412+ void testProjectionShrinkAfterSavepointRestart () throws Exception {
413+ runInContainerAsRoot (jobManager , "chmod" , "0777" , "-R" , "/tmp/cdc/" );
414+
415+ // Phase 1: Start with wider projection (id, name, address)
416+ String contentV1 = buildCustomersProjectionPipeline ("id, name, address" );
417+ JobID jobID = submitPipelineJob (contentV1 );
418+ Assertions .assertThat (jobID ).isNotNull ();
419+ LOG .info ("Submitted Job ID is {} " , jobID );
420+
421+ // Phase 2: Validate snapshot with wider projection
422+ validateResult (
423+ dbNameFormatter ,
424+ "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024)}, primaryKeys=id, options=()}" ,
425+ "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai], op=INSERT, meta=()}" ,
426+ "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai], op=INSERT, meta=()}" ,
427+ "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai], op=INSERT, meta=()}" ,
428+ "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai], op=INSERT, meta=()}" );
429+ LOG .info ("Snapshot stage finished successfully." );
430+
431+ // Phase 3: Validate incremental with wider projection
432+ executeMySqlStatements (
433+ mysqlInventoryDatabase ,
434+ "INSERT INTO customers VALUES (105, 'user_5', 'Beijing', '123567891235');" );
435+ validateResult (
436+ dbNameFormatter ,
437+ "DataChangeEvent{tableId=%s.customers, before=[], after=[105, user_5, Beijing], op=INSERT, meta=()}" );
438+ LOG .info ("Incremental stage 1 finished successfully." );
439+
440+ // Phase 4: Stop with savepoint
441+ String savepointPath = stopJobWithSavepoint (jobID );
442+ LOG .info ("Stopped Job {} and created a savepoint at {}." , jobID , savepointPath );
443+
444+ // Phase 5: Restart with narrower projection (id, name only)
445+ String contentV2 = buildCustomersProjectionPipeline ("id, name" );
446+ JobID newJobID = submitPipelineJob (contentV2 , savepointPath , true );
447+ LOG .info ("Reincarnated Job {} has been submitted successfully." , newJobID );
448+
449+ // Phase 6: Validate schema evolution - DropColumnEvent for address
450+ validateResult (
451+ dbNameFormatter ,
452+ "DropColumnEvent{tableId=%s.customers, droppedColumnNames=[address]}" );
453+ LOG .info ("Schema evolution (DropColumn) validated successfully." );
454+
455+ // Phase 7: Validate new data has only id and name
456+ executeMySqlStatements (
457+ mysqlInventoryDatabase ,
458+ "INSERT INTO customers VALUES (106, 'user_6', 'Shenzhen', '123567891236');" );
459+ validateResult (
460+ dbNameFormatter ,
461+ "DataChangeEvent{tableId=%s.customers, before=[], after=[106, user_6], op=INSERT, meta=()}" );
462+ LOG .info ("Incremental stage 2 with narrowed projection finished successfully." );
463+
464+ // Cleanup
465+ cancelJob (newJobID );
466+ }
467+
468+ @ Test
469+ void testProjectionColumnTypeChangeAfterSavepointRestart () throws Exception {
470+ runInContainerAsRoot (jobManager , "chmod" , "0777" , "-R" , "/tmp/cdc/" );
471+
472+ // Phase 1: Start with projection (id, name) where name is VARCHAR(255) NOT NULL
473+ String contentV1 = buildCustomersProjectionPipeline ("id, name" );
474+ JobID jobID = submitPipelineJob (contentV1 );
475+ Assertions .assertThat (jobID ).isNotNull ();
476+ LOG .info ("Submitted Job ID is {} " , jobID );
477+
478+ // Phase 2: Validate snapshot with {id INT, name VARCHAR(255)} schema
479+ validateResult (
480+ dbNameFormatter ,
481+ "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink'}, primaryKeys=id, options=()}" ,
482+ "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1], op=INSERT, meta=()}" ,
483+ "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2], op=INSERT, meta=()}" ,
484+ "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3], op=INSERT, meta=()}" ,
485+ "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4], op=INSERT, meta=()}" );
486+ LOG .info ("Snapshot stage finished successfully." );
487+
488+ // Phase 3: Stop with savepoint
489+ String savepointPath = stopJobWithSavepoint (jobID );
490+ LOG .info ("Stopped Job {} and created a savepoint at {}." , jobID , savepointPath );
491+
492+ // Phase 4: Restart with type-changed projection (CAST name to VARCHAR(100))
493+ String contentV2 =
494+ buildCustomersProjectionPipeline ("id, CAST(name AS VARCHAR(100)) AS name" );
495+ JobID newJobID = submitPipelineJob (contentV2 , savepointPath , true );
496+ LOG .info ("Reincarnated Job {} has been submitted successfully." , newJobID );
497+
498+ // Phase 5: Validate schema evolution - AlterColumnTypeEvent for name column
499+ validateResult (
500+ dbNameFormatter ,
501+ "AlterColumnTypeEvent{tableId=%s.customers, typeMapping={name=VARCHAR(100)}, oldTypeMapping={name=VARCHAR(255) NOT NULL}}" );
502+ LOG .info ("Schema evolution (AlterColumnType) validated successfully." );
503+
504+ // Phase 6: Validate new data flows correctly with changed type
505+ executeMySqlStatements (
506+ mysqlInventoryDatabase ,
507+ "INSERT INTO customers VALUES (105, 'user_5', 'Beijing', '123567891235');" );
508+ validateResult (
509+ dbNameFormatter ,
510+ "DataChangeEvent{tableId=%s.customers, before=[], after=[105, user_5], op=INSERT, meta=()}" );
511+ LOG .info ("Incremental stage with type-changed projection finished successfully." );
512+
513+ // Cleanup
514+ cancelJob (newJobID );
515+ }
516+
517+ @ Test
518+ void testIdenticalProjectionAfterSavepointRestart () throws Exception {
519+ runInContainerAsRoot (jobManager , "chmod" , "0777" , "-R" , "/tmp/cdc/" );
520+
521+ // Phase 1: Start with projection (id, name)
522+ String contentV1 = buildCustomersProjectionPipeline ("id, name" );
523+ JobID jobID = submitPipelineJob (contentV1 );
524+ Assertions .assertThat (jobID ).isNotNull ();
525+ LOG .info ("Submitted Job ID is {} " , jobID );
526+
527+ // Phase 2: Validate snapshot with {id INT, name VARCHAR(255)} schema and data
528+ validateResult (
529+ dbNameFormatter ,
530+ "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink'}, primaryKeys=id, options=()}" ,
531+ "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1], op=INSERT, meta=()}" ,
532+ "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2], op=INSERT, meta=()}" ,
533+ "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3], op=INSERT, meta=()}" ,
534+ "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4], op=INSERT, meta=()}" );
535+ LOG .info ("Snapshot stage finished successfully." );
536+
537+ // Phase 3: Validate incremental mode works before savepoint
538+ executeMySqlStatements (
539+ mysqlInventoryDatabase ,
540+ "INSERT INTO customers VALUES (105, 'user_5', 'Beijing', '123567891235');" );
541+ validateResult (
542+ dbNameFormatter ,
543+ "DataChangeEvent{tableId=%s.customers, before=[], after=[105, user_5], op=INSERT, meta=()}" );
544+ LOG .info ("Incremental stage 1 finished successfully." );
545+
546+ // Phase 4: Stop with savepoint
547+ String savepointPath = stopJobWithSavepoint (jobID );
548+ LOG .info ("Stopped Job {} and created a savepoint at {}." , jobID , savepointPath );
549+
550+ // Phase 5: Restart with the exact same projection (id, name)
551+ String contentV2 = buildCustomersProjectionPipeline ("id, name" );
552+ JobID newJobID = submitPipelineJob (contentV2 , savepointPath , true );
553+ LOG .info ("Reincarnated Job {} has been submitted successfully." , newJobID );
554+
555+ // Phase 6: Insert a new row and validate only the DataChangeEvent appears
556+ // No AddColumnEvent, DropColumnEvent, or AlterColumnTypeEvent should be emitted
557+ // because the projection is identical — isSchemaChangeEventRedundant() suppresses them
558+ executeMySqlStatements (
559+ mysqlInventoryDatabase ,
560+ "INSERT INTO customers VALUES (106, 'user_6', 'Shenzhen', '123567891236');" );
561+ validateResult (
562+ dbNameFormatter ,
563+ "DataChangeEvent{tableId=%s.customers, before=[], after=[106, user_6], op=INSERT, meta=()}" );
564+ LOG .info (
565+ "Incremental stage 2 with identical projection finished successfully - no schema change events emitted." );
566+
567+ // Cleanup
568+ cancelJob (newJobID );
569+ }
570+
571+ private String buildCustomersProjectionPipeline (String projection ) {
572+ return String .format (
573+ "source:\n "
574+ + " type: mysql\n "
575+ + " hostname: %s\n "
576+ + " port: %d\n "
577+ + " username: %s\n "
578+ + " password: %s\n "
579+ + " tables: %s.customers\n "
580+ + " server-id: 5400-5404\n "
581+ + " server-time-zone: UTC\n "
582+ + "\n "
583+ + "sink:\n "
584+ + " type: values\n "
585+ + "\n "
586+ + "transform:\n "
587+ + " - source-table: %s.customers\n "
588+ + " projection: %s\n "
589+ + "\n "
590+ + "pipeline:\n "
591+ + " parallelism: %d\n "
592+ + " schema.change.behavior: evolve\n " ,
593+ INTER_CONTAINER_MYSQL_ALIAS ,
594+ MySqlContainer .MYSQL_PORT ,
595+ MYSQL_TEST_USER ,
596+ MYSQL_TEST_PASSWORD ,
597+ mysqlInventoryDatabase .getDatabaseName (),
598+ mysqlInventoryDatabase .getDatabaseName (),
599+ projection ,
600+ parallelism );
601+ }
602+
354603 private void generateIncrementalEventsPhaseOne () {
355604 executeMySqlStatements (
356605 mysqlInventoryDatabase ,
0 commit comments