From e49d2d464171ddabc7ad67b6b8d2ebb4c82ef46a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Can=20=C5=9Eakiro=C4=9Flu?= Date: Thu, 4 Jun 2026 12:42:34 +0300 Subject: [PATCH 1/2] Expose scan.newly-added-table.enabled option MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the scan.newly-added-table.enabled YAML option to the Postgres Pipeline connector. The underlying SnapshotSplitAssigner.captureNewlyAddedTables() mechanism + PostgresSourceBuilder.scanNewlyAddedTableEnabled() builder method already exist in the postgres-cdc source; this PR adds the missing YAML-side wiring. Mirrors the same option already exposed by the MySQL Pipeline connector (MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED). Default is false, so the change is no-op for existing pipelines. When set to true, restoring from a savepoint will discover tables that match the source tables: pattern but were not part of the captured set at savepoint time — enabling DMS-style 'add a new table without re-snapshotting existing tables' workflows. Signed-off-by: Mehmet Can Şakiroğlu --- .../postgres/factory/PostgresDataSourceFactory.java | 4 ++++ .../postgres/source/PostgresDataSourceOptions.java | 13 +++++++++++++ 2 files changed, 17 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java index 93d8657cf56..ace3b311378 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java @@ -72,6 +72,7 @@ import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCHEMA_CHANGE_ENABLED; @@ -133,6 +134,7 @@ public DataSource createDataSource(Context context) { int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE); boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED); + boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -175,6 +177,7 @@ public DataSource createDataSource(Context context) { .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst) .includeDatabaseInTableId(tableIdIncludeDatabase) .includeSchemaChanges(includeSchemaChanges) + .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .getConfigFactory(); List tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null); @@ -266,6 +269,7 @@ public Set> optionalOptions() { options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); options.add(TABLE_ID_INCLUDE_DATABASE); options.add(SCHEMA_CHANGE_ENABLED); + options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java index 95cc823d211..a1e6b619dfb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java @@ -281,4 +281,17 @@ public class PostgresDataSourceOptions { .defaultValue(false) .withDescription( "Whether to infer CDC column types when processing pgoutput Relation messages."); + + @Experimental + public static final ConfigOption SCAN_NEWLY_ADDED_TABLE_ENABLED = + ConfigOptions.key("scan.newly-added-table.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to scan the newly added tables or not. Defaults to false. " + + "This option only takes effect when restoring from a savepoint or checkpoint, " + + "and enables the existing SnapshotSplitAssigner#captureNewlyAddedTables() code path " + + "to discover tables that match the source `tables:` pattern but were not part of " + + "the captured set at savepoint time. Mirrors the MySQL Pipeline connector option " + + "of the same name."); } From becc26020c247a78fcd84a474506c7a4fd3711a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Can=20=C5=9Eakiro=C4=9Flu?= Date: Mon, 15 Jun 2026 18:24:20 +0300 Subject: [PATCH 2/2] Drop @Experimental + add factory test for scan.newly-added-table.enabled --- .../source/PostgresDataSourceOptions.java | 1 - .../PostgresDataSourceFactoryTest.java | 20 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java index a1e6b619dfb..183d7cc89f4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java @@ -282,7 +282,6 @@ public class PostgresDataSourceOptions { .withDescription( "Whether to infer CDC column types when processing pgoutput Relation messages."); - @Experimental public static final ConfigOption SCAN_NEWLY_ADDED_TABLE_ENABLED = ConfigOptions.key("scan.newly-added-table.enabled") .booleanType() diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java index c3cce43bc97..f64a0935b60 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java @@ -48,6 +48,7 @@ import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.HOSTNAME; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.PG_PORT; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SLOT_NAME; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES_EXCLUDE; @@ -97,6 +98,25 @@ public void testCreateDataSource() { .isEqualTo(Arrays.asList("inventory.products")); } + @Test + public void testScanNewlyAddedTableEnabled() { + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost()); + options.put( + PG_PORT.key(), String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), POSTGRES_CONTAINER.getDatabaseName() + ".inventory.prod\\.*"); + options.put(SLOT_NAME.key(), slotName); + options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); + + Factory.Context context = new MockContext(Configuration.fromMap(options)); + PostgresDataSourceFactory factory = new PostgresDataSourceFactory(); + PostgresDataSource dataSource = (PostgresDataSource) factory.createDataSource(context); + + assertThat(dataSource.getPostgresSourceConfig().isScanNewlyAddedTableEnabled()).isTrue(); + } + @Test public void testNoMatchedTable() { Map options = new HashMap<>();