From cd30df56c5530bab41d4dac607e94164b19a7728 Mon Sep 17 00:00:00 2001 From: VinaySagarGonabavi Date: Tue, 3 Mar 2026 18:41:13 -0800 Subject: [PATCH 1/2] [FLINK-38828][cdc-common][cdc-runtime] Handle schema evolution upon projection updates via SchemaCoordinator --- .../flink/cdc/common/utils/SchemaUtils.java | 5 +- .../common/utils/SchemaMergingUtilsTest.java | 68 ++++++ .../cdc/common/utils/SchemaUtilsTest.java | 63 +++++ .../schema/regular/SchemaCoordinator.java | 29 ++- .../schema/regular/SchemaEvolveTest.java | 217 ++++++++++++++++++ 5 files changed, 374 insertions(+), 8 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 131a783f551..5da2f7577e8 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -348,8 +348,9 @@ public static boolean isSchemaChangeEventRedundant( return true; }, createTableEvent -> { - // It has been applied if such table already exists - return latestSchema.isPresent(); + // Redundant only if the table exists AND the schema is identical + return latestSchema.isPresent() + && latestSchema.get().equals(createTableEvent.getSchema()); }, dropColumnEvent -> { // It has not been applied if schema does not even exist diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java index 1f53668d430..102e416c4f7 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java @@ -495,6 +495,74 @@ TABLE_ID, of("id", BIGINT), of("name", VARCHAR(17), "id", BIGINT))) ImmutableMap.of("foo", INT, "baz", FLOAT))); } + @Test + void testGetSchemaDifferenceForProjectionChanges() { + // Simulate projection change: new schema has an added column + Assertions.assertThat( + getSchemaDifference( + TABLE_ID, + of("id", BIGINT, "name", STRING), + of("id", BIGINT, "name", STRING, "age", INT))) + .as("projection change adding a column should produce AddColumnEvent") + .containsExactly( + new AddColumnEvent( + TABLE_ID, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("age", INT), + AddColumnEvent.ColumnPosition.AFTER, + "name")))); + + // Simulate projection change: new schema has a removed column + Assertions.assertThat( + getSchemaDifference( + TABLE_ID, + of("id", BIGINT, "name", STRING, "age", INT), + of("id", BIGINT, "name", STRING))) + .as("projection change removing a column should produce DropColumnEvent") + .containsExactly(new DropColumnEvent(TABLE_ID, Collections.singletonList("age"))); + + // Simulate projection change: column type changed (e.g. STRING -> VARCHAR(255)) + Assertions.assertThat( + getSchemaDifference( + TABLE_ID, + of("id", BIGINT, "name", STRING), + of("id", BIGINT, "name", DataTypes.VARCHAR(255)))) + .as( + "projection change with different column type should produce AlterColumnTypeEvent") + .containsExactly( + new AlterColumnTypeEvent( + TABLE_ID, + Collections.singletonMap("name", DataTypes.VARCHAR(255)), + Collections.singletonMap("name", STRING))); + + // Simulate projection change: both added and removed columns (column swap) + Assertions.assertThat( + getSchemaDifference( + TABLE_ID, + of("id", BIGINT, "name", STRING), + of("id", BIGINT, "email", STRING))) + .as("projection change swapping columns should produce Add + Drop events") + .containsExactly( + new AddColumnEvent( + TABLE_ID, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("email", STRING), + AddColumnEvent.ColumnPosition.AFTER, + "id"))), + new DropColumnEvent(TABLE_ID, Collections.singletonList("name"))); + + // Simulate projection change: identical schemas should produce no diff + Assertions.assertThat( + getSchemaDifference( + TABLE_ID, + of("id", BIGINT, "name", STRING), + of("id", BIGINT, "name", STRING))) + .as("identical schemas should produce no schema change events") + .isEmpty(); + } + @Test void testMergeAndDiff() { Assertions.assertThat(mergeAndDiff(null, of("id", BIGINT, "name", VARCHAR(17)))) diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java index cdf1532d24c..3aa658ba1ad 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.TableId; @@ -484,4 +485,66 @@ void testInferWiderSchema() { .build())) .isExactlyInstanceOf(IllegalStateException.class); } + + @Test + void testIsSchemaChangeEventRedundantForCreateTableEvent() { + TableId tableId = TableId.tableId("test_db", "test_table"); + + Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + + // CreateTableEvent is NOT redundant when table doesn't exist (null schema) + CreateTableEvent createEvent = new CreateTableEvent(tableId, schema); + Assertions.assertThat(SchemaUtils.isSchemaChangeEventRedundant(null, createEvent)) + .as("CreateTableEvent should not be redundant when table does not exist") + .isFalse(); + + // CreateTableEvent IS redundant when table exists with the same schema + Assertions.assertThat(SchemaUtils.isSchemaChangeEventRedundant(schema, createEvent)) + .as("CreateTableEvent should be redundant when table exists with same schema") + .isTrue(); + + // CreateTableEvent is NOT redundant when table exists with different schema (extra column) + Schema schemaWithExtraColumn = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(); + CreateTableEvent createEventWithExtraColumn = + new CreateTableEvent(tableId, schemaWithExtraColumn); + Assertions.assertThat( + SchemaUtils.isSchemaChangeEventRedundant( + schema, createEventWithExtraColumn)) + .as("CreateTableEvent should not be redundant when new schema has extra columns") + .isFalse(); + + // CreateTableEvent is NOT redundant when column types differ + Schema schemaWithDifferentType = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.VARCHAR(255)) + .primaryKey("id") + .build(); + CreateTableEvent createEventWithDifferentType = + new CreateTableEvent(tableId, schemaWithDifferentType); + Assertions.assertThat( + SchemaUtils.isSchemaChangeEventRedundant( + schema, createEventWithDifferentType)) + .as("CreateTableEvent should not be redundant when column types differ") + .isFalse(); + + // CreateTableEvent is NOT redundant when existing schema has more columns than new schema + Assertions.assertThat( + SchemaUtils.isSchemaChangeEventRedundant( + schemaWithExtraColumn, createEvent)) + .as( + "CreateTableEvent should not be redundant when existing schema has more columns") + .isFalse(); + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java index 53344dc87ff..72f5211253e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java @@ -288,12 +288,29 @@ private List deduceEvolvedSchemaChanges(SchemaChangeEvent eve List rawSchemaChangeEvents = new ArrayList<>(); if (upstreamDependencies.size() == 1) { - // If it's a one-by-one routing rule, we can simply forward it to downstream sink. - SchemaChangeEvent rawEvent = event.copy(evolvedTableId); - rawSchemaChangeEvents.add(rawEvent); - LOG.info( - "Step 3.3 - It's an one-by-one routing and could be forwarded as {}.", - rawEvent); + // If it's a one-by-one routing rule, we can simply forward it to downstream + // sink. However, if the incoming event is a CreateTableEvent for an + // already-known evolved table (e.g. after restart with changed projections), + // we must compute the schema diff instead of forwarding the raw + // CreateTableEvent, which would fail at the sink. + if (event instanceof CreateTableEvent && currentEvolvedSchema != null) { + CreateTableEvent createTableEvent = (CreateTableEvent) event; + List diffEvents = + SchemaMergingUtils.getSchemaDifference( + evolvedTableId, + currentEvolvedSchema, + createTableEvent.getSchema()); + rawSchemaChangeEvents.addAll(diffEvents); + LOG.info( + "Step 3.3 - It's a one-by-one routing but CreateTableEvent for existing table. Computed diff events: {}.", + diffEvents); + } else { + SchemaChangeEvent rawEvent = event.copy(evolvedTableId); + rawSchemaChangeEvents.add(rawEvent); + LOG.info( + "Step 3.3 - It's an one-by-one routing and could be forwarded as {}.", + rawEvent); + } } else { Set toBeMergedSchemas = SchemaDerivator.reverseLookupDependingUpstreamSchemas( diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java index 740c77bd49d..1e37cfc3ee5 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java @@ -2561,6 +2561,223 @@ tableId, buildRecord(INT, 12, INT, 0, SMALLINT, (short) 11)), } } + /** + * Tests that sending a CreateTableEvent for an already-existing table with a different schema + * (simulating a projection change) produces the correct diff events (AddColumnEvent / + * DropColumnEvent) instead of a raw CreateTableEvent. + */ + @Test + void testCreateTableEventWithChangedProjection() throws Exception { + TableId tableId = CUSTOMERS_TABLE_ID; + Schema schemaV1 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("name", STRING) + .physicalColumn("age", SMALLINT) + .primaryKey("id") + .build(); + + SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE; + + SchemaOperator schemaOperator = + new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + RegularEventOperatorTestHarness harness = + RegularEventOperatorTestHarness.withDurationAndBehavior( + schemaOperator, 17, Duration.ofSeconds(3), behavior); + harness.open(); + + // Step 1: Create the table with the initial schema + { + List createAndInsertDataEvents = + Arrays.asList( + new CreateTableEvent(tableId, schemaV1), + DataChangeEvent.insertEvent( + tableId, + buildRecord(INT, 1, STRING, "Alice", SMALLINT, (short) 17))); + + processEvent(schemaOperator, createAndInsertDataEvents); + + Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV1); + + harness.clearOutputRecords(); + } + + // Step 2: Send a CreateTableEvent with a different schema (added column, simulating + // projection change). The coordinator should convert this into AddColumnEvent. + { + Schema schemaV2 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("name", STRING) + .physicalColumn("age", SMALLINT) + .physicalColumn("email", STRING) + .primaryKey("id") + .build(); + + List createWithNewProjection = + Collections.singletonList(new CreateTableEvent(tableId, schemaV2)); + + processEvent(schemaOperator, createWithNewProjection); + + // The evolved schema should now include the new "email" column + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV2); + + // Verify that the output contains an AddColumnEvent (not a CreateTableEvent) + // for the "email" column, preceded by a FlushEvent + List outputEvents = + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList()); + + Assertions.assertThat(outputEvents) + .filteredOn(e -> e instanceof AddColumnEvent) + .hasSize(1); + + AddColumnEvent addColumnEvent = + (AddColumnEvent) + outputEvents.stream() + .filter(e -> e instanceof AddColumnEvent) + .findFirst() + .get(); + Assertions.assertThat(addColumnEvent.getAddedColumns()) + .extracting(c -> c.getAddColumn().getName()) + .containsExactly("email"); + + // Verify no CreateTableEvent was emitted downstream (it should have been + // converted to diff events) + Assertions.assertThat(outputEvents) + .filteredOn(e -> e instanceof CreateTableEvent) + .isEmpty(); + + harness.clearOutputRecords(); + } + + // Step 3: Send a CreateTableEvent with a removed column (simulating projection change + // that drops a column). The coordinator should convert this into DropColumnEvent. + { + Schema schemaV3 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("name", STRING) + .physicalColumn("email", STRING) + .primaryKey("id") + .build(); + + List createWithDroppedColumn = + Collections.singletonList(new CreateTableEvent(tableId, schemaV3)); + + processEvent(schemaOperator, createWithDroppedColumn); + + // The evolved schema should now reflect the dropped "age" column + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV3); + + List outputEvents = + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList()); + + Assertions.assertThat(outputEvents) + .filteredOn(e -> e instanceof DropColumnEvent) + .hasSize(1); + + DropColumnEvent dropColumnEvent = + (DropColumnEvent) + outputEvents.stream() + .filter(e -> e instanceof DropColumnEvent) + .findFirst() + .get(); + Assertions.assertThat(dropColumnEvent.getDroppedColumnNames()).containsExactly("age"); + + // Verify no CreateTableEvent was emitted downstream + Assertions.assertThat(outputEvents) + .filteredOn(e -> e instanceof CreateTableEvent) + .isEmpty(); + + harness.clearOutputRecords(); + } + + // Step 4: Send a CreateTableEvent with a column type change (simulating projection + // change that alters a column type). The coordinator should convert this into + // AlterColumnTypeEvent. + { + Schema schemaV4 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("name", DataTypes.VARCHAR(255)) + .physicalColumn("email", STRING) + .primaryKey("id") + .build(); + + List createWithTypeChange = + Collections.singletonList(new CreateTableEvent(tableId, schemaV4)); + + processEvent(schemaOperator, createWithTypeChange); + + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV4); + + List outputEvents = + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList()); + + Assertions.assertThat(outputEvents) + .filteredOn(e -> e instanceof AlterColumnTypeEvent) + .hasSize(1); + + AlterColumnTypeEvent alterColumnTypeEvent = + (AlterColumnTypeEvent) + outputEvents.stream() + .filter(e -> e instanceof AlterColumnTypeEvent) + .findFirst() + .get(); + Assertions.assertThat(alterColumnTypeEvent.getTypeMapping()) + .containsEntry("name", DataTypes.VARCHAR(255)); + + // Verify no CreateTableEvent was emitted downstream + Assertions.assertThat(outputEvents) + .filteredOn(e -> e instanceof CreateTableEvent) + .isEmpty(); + + harness.clearOutputRecords(); + } + + // Step 5: Send a CreateTableEvent with the same schema (no change). Should be + // redundant (caught by isSchemaChangeEventRedundant) and produce no schema change + // events. + { + Schema schemaV4 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("name", DataTypes.VARCHAR(255)) + .physicalColumn("email", STRING) + .primaryKey("id") + .build(); + + List createWithSameSchema = + Collections.singletonList(new CreateTableEvent(tableId, schemaV4)); + + processEvent(schemaOperator, createWithSameSchema); + + List outputEvents = + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList()); + + // No schema change events should be emitted for an identical schema + Assertions.assertThat(outputEvents) + .filteredOn( + e -> + e instanceof CreateTableEvent + || e instanceof AddColumnEvent + || e instanceof DropColumnEvent + || e instanceof AlterColumnTypeEvent) + .isEmpty(); + + harness.clearOutputRecords(); + } + } + private RecordData buildRecord(final Object... args) { List dataTypes = new ArrayList<>(); List objects = new ArrayList<>(); From 40f55ce5a2490535175b3de4335d661d0c996bb5 Mon Sep 17 00:00:00 2001 From: VinaySagarGonabavi Date: Tue, 24 Mar 2026 21:10:27 -0700 Subject: [PATCH 2/2] [FLINK-38828][runtime] Add E2E migration tests for projection schema evolution --- .../migration/YamlJobMigrationITCase.java | 249 ++++++++++++++++++ 1 file changed, 249 insertions(+) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java index bdf970ea67c..fe27cdeebcf 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java @@ -351,6 +351,255 @@ void testRestartingJobFromSavepointInSnapshotMode(TarballFetcher.CdcVersion migr LOG.info("Snapshot stage finished successfully."); } + @Test + void testProjectionChangeAfterSavepointRestart() throws Exception { + runInContainerAsRoot(jobManager, "chmod", "0777", "-R", "/tmp/cdc/"); + + // Phase 1: Start with narrow projection (id, name only) + String contentV1 = buildCustomersProjectionPipeline("id, name"); + JobID jobID = submitPipelineJob(contentV1); + Assertions.assertThat(jobID).isNotNull(); + LOG.info("Submitted Job ID is {} ", jobID); + + // Phase 2: Validate snapshot with narrow projection + validateResult( + dbNameFormatter, + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink'}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4], op=INSERT, meta=()}"); + LOG.info("Snapshot stage finished successfully."); + + // Phase 3: Validate incremental with narrow projection + executeMySqlStatements( + mysqlInventoryDatabase, + "INSERT INTO customers VALUES (105, 'user_5', 'Beijing', '123567891235');"); + validateResult( + dbNameFormatter, + "DataChangeEvent{tableId=%s.customers, before=[], after=[105, user_5], op=INSERT, meta=()}"); + LOG.info("Incremental stage 1 finished successfully."); + + // Phase 4: Stop with savepoint + String savepointPath = stopJobWithSavepoint(jobID); + LOG.info("Stopped Job {} and created a savepoint at {}.", jobID, savepointPath); + + // Phase 5: Restart with expanded projection (id, name, address) + String contentV2 = buildCustomersProjectionPipeline("id, name, address"); + JobID newJobID = submitPipelineJob(contentV2, savepointPath, true); + LOG.info("Reincarnated Job {} has been submitted successfully.", newJobID); + + // Phase 6: Validate schema evolution - AddColumnEvent for address + validateResult( + dbNameFormatter, + "AddColumnEvent{tableId=%s.customers, addedColumns=[ColumnWithPosition{column=`address` VARCHAR(1024), position=LAST, existedColumnName=null}]}"); + LOG.info("Schema evolution (AddColumn) validated successfully."); + + // Phase 7: Validate new data includes address + executeMySqlStatements( + mysqlInventoryDatabase, + "INSERT INTO customers VALUES (106, 'user_6', 'Shenzhen', '123567891236');"); + validateResult( + dbNameFormatter, + "DataChangeEvent{tableId=%s.customers, before=[], after=[106, user_6, Shenzhen], op=INSERT, meta=()}"); + LOG.info("Incremental stage 2 with expanded projection finished successfully."); + + // Cleanup + cancelJob(newJobID); + } + + @Test + void testProjectionShrinkAfterSavepointRestart() throws Exception { + runInContainerAsRoot(jobManager, "chmod", "0777", "-R", "/tmp/cdc/"); + + // Phase 1: Start with wider projection (id, name, address) + String contentV1 = buildCustomersProjectionPipeline("id, name, address"); + JobID jobID = submitPipelineJob(contentV1); + Assertions.assertThat(jobID).isNotNull(); + LOG.info("Submitted Job ID is {} ", jobID); + + // Phase 2: Validate snapshot with wider projection + validateResult( + dbNameFormatter, + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai], op=INSERT, meta=()}"); + LOG.info("Snapshot stage finished successfully."); + + // Phase 3: Validate incremental with wider projection + executeMySqlStatements( + mysqlInventoryDatabase, + "INSERT INTO customers VALUES (105, 'user_5', 'Beijing', '123567891235');"); + validateResult( + dbNameFormatter, + "DataChangeEvent{tableId=%s.customers, before=[], after=[105, user_5, Beijing], op=INSERT, meta=()}"); + LOG.info("Incremental stage 1 finished successfully."); + + // Phase 4: Stop with savepoint + String savepointPath = stopJobWithSavepoint(jobID); + LOG.info("Stopped Job {} and created a savepoint at {}.", jobID, savepointPath); + + // Phase 5: Restart with narrower projection (id, name only) + String contentV2 = buildCustomersProjectionPipeline("id, name"); + JobID newJobID = submitPipelineJob(contentV2, savepointPath, true); + LOG.info("Reincarnated Job {} has been submitted successfully.", newJobID); + + // Phase 6: Validate schema evolution - DropColumnEvent for address + validateResult( + dbNameFormatter, + "DropColumnEvent{tableId=%s.customers, droppedColumnNames=[address]}"); + LOG.info("Schema evolution (DropColumn) validated successfully."); + + // Phase 7: Validate new data has only id and name + executeMySqlStatements( + mysqlInventoryDatabase, + "INSERT INTO customers VALUES (106, 'user_6', 'Shenzhen', '123567891236');"); + validateResult( + dbNameFormatter, + "DataChangeEvent{tableId=%s.customers, before=[], after=[106, user_6], op=INSERT, meta=()}"); + LOG.info("Incremental stage 2 with narrowed projection finished successfully."); + + // Cleanup + cancelJob(newJobID); + } + + @Test + void testProjectionColumnTypeChangeAfterSavepointRestart() throws Exception { + runInContainerAsRoot(jobManager, "chmod", "0777", "-R", "/tmp/cdc/"); + + // Phase 1: Start with projection (id, name) where name is VARCHAR(255) NOT NULL + String contentV1 = buildCustomersProjectionPipeline("id, name"); + JobID jobID = submitPipelineJob(contentV1); + Assertions.assertThat(jobID).isNotNull(); + LOG.info("Submitted Job ID is {} ", jobID); + + // Phase 2: Validate snapshot with {id INT, name VARCHAR(255)} schema + validateResult( + dbNameFormatter, + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink'}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4], op=INSERT, meta=()}"); + LOG.info("Snapshot stage finished successfully."); + + // Phase 3: Stop with savepoint + String savepointPath = stopJobWithSavepoint(jobID); + LOG.info("Stopped Job {} and created a savepoint at {}.", jobID, savepointPath); + + // Phase 4: Restart with type-changed projection (CAST name to VARCHAR(100)) + String contentV2 = + buildCustomersProjectionPipeline("id, CAST(name AS VARCHAR(100)) AS name"); + JobID newJobID = submitPipelineJob(contentV2, savepointPath, true); + LOG.info("Reincarnated Job {} has been submitted successfully.", newJobID); + + // Phase 5: Validate schema evolution - AlterColumnTypeEvent for name column + validateResult( + dbNameFormatter, + "AlterColumnTypeEvent{tableId=%s.customers, typeMapping={name=VARCHAR(100)}, oldTypeMapping={name=VARCHAR(255) NOT NULL}}"); + LOG.info("Schema evolution (AlterColumnType) validated successfully."); + + // Phase 6: Validate new data flows correctly with changed type + executeMySqlStatements( + mysqlInventoryDatabase, + "INSERT INTO customers VALUES (105, 'user_5', 'Beijing', '123567891235');"); + validateResult( + dbNameFormatter, + "DataChangeEvent{tableId=%s.customers, before=[], after=[105, user_5], op=INSERT, meta=()}"); + LOG.info("Incremental stage with type-changed projection finished successfully."); + + // Cleanup + cancelJob(newJobID); + } + + @Test + void testIdenticalProjectionAfterSavepointRestart() throws Exception { + runInContainerAsRoot(jobManager, "chmod", "0777", "-R", "/tmp/cdc/"); + + // Phase 1: Start with projection (id, name) + String contentV1 = buildCustomersProjectionPipeline("id, name"); + JobID jobID = submitPipelineJob(contentV1); + Assertions.assertThat(jobID).isNotNull(); + LOG.info("Submitted Job ID is {} ", jobID); + + // Phase 2: Validate snapshot with {id INT, name VARCHAR(255)} schema and data + validateResult( + dbNameFormatter, + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink'}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4], op=INSERT, meta=()}"); + LOG.info("Snapshot stage finished successfully."); + + // Phase 3: Validate incremental mode works before savepoint + executeMySqlStatements( + mysqlInventoryDatabase, + "INSERT INTO customers VALUES (105, 'user_5', 'Beijing', '123567891235');"); + validateResult( + dbNameFormatter, + "DataChangeEvent{tableId=%s.customers, before=[], after=[105, user_5], op=INSERT, meta=()}"); + LOG.info("Incremental stage 1 finished successfully."); + + // Phase 4: Stop with savepoint + String savepointPath = stopJobWithSavepoint(jobID); + LOG.info("Stopped Job {} and created a savepoint at {}.", jobID, savepointPath); + + // Phase 5: Restart with the exact same projection (id, name) + String contentV2 = buildCustomersProjectionPipeline("id, name"); + JobID newJobID = submitPipelineJob(contentV2, savepointPath, true); + LOG.info("Reincarnated Job {} has been submitted successfully.", newJobID); + + // Phase 6: Insert a new row and validate only the DataChangeEvent appears + // No AddColumnEvent, DropColumnEvent, or AlterColumnTypeEvent should be emitted + // because the projection is identical — isSchemaChangeEventRedundant() suppresses them + executeMySqlStatements( + mysqlInventoryDatabase, + "INSERT INTO customers VALUES (106, 'user_6', 'Shenzhen', '123567891236');"); + validateResult( + dbNameFormatter, + "DataChangeEvent{tableId=%s.customers, before=[], after=[106, user_6], op=INSERT, meta=()}"); + LOG.info( + "Incremental stage 2 with identical projection finished successfully - no schema change events emitted."); + + // Cleanup + cancelJob(newJobID); + } + + private String buildCustomersProjectionPipeline(String projection) { + return String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: %d\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.customers\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "transform:\n" + + " - source-table: %s.customers\n" + + " projection: %s\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d\n" + + " schema.change.behavior: evolve\n", + INTER_CONTAINER_MYSQL_ALIAS, + MySqlContainer.MYSQL_PORT, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + mysqlInventoryDatabase.getDatabaseName(), + projection, + parallelism); + } + private void generateIncrementalEventsPhaseOne() { executeMySqlStatements( mysqlInventoryDatabase,