diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java index 93d8657cf56..ace3b311378 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java @@ -72,6 +72,7 @@ import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCHEMA_CHANGE_ENABLED; @@ -133,6 +134,7 @@ public DataSource createDataSource(Context context) { int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE); boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED); + boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -175,6 +177,7 @@ public DataSource createDataSource(Context context) { .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst) .includeDatabaseInTableId(tableIdIncludeDatabase) .includeSchemaChanges(includeSchemaChanges) + .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .getConfigFactory(); List tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null); @@ -266,6 +269,7 @@ public Set> optionalOptions() { options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); options.add(TABLE_ID_INCLUDE_DATABASE); options.add(SCHEMA_CHANGE_ENABLED); + options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java index 95cc823d211..183d7cc89f4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java @@ -281,4 +281,16 @@ public class PostgresDataSourceOptions { .defaultValue(false) .withDescription( "Whether to infer CDC column types when processing pgoutput Relation messages."); + + public static final ConfigOption SCAN_NEWLY_ADDED_TABLE_ENABLED = + ConfigOptions.key("scan.newly-added-table.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to scan the newly added tables or not. Defaults to false. " + + "This option only takes effect when restoring from a savepoint or checkpoint, " + + "and enables the existing SnapshotSplitAssigner#captureNewlyAddedTables() code path " + + "to discover tables that match the source `tables:` pattern but were not part of " + + "the captured set at savepoint time. Mirrors the MySQL Pipeline connector option " + + "of the same name."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java index c3cce43bc97..f64a0935b60 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java @@ -48,6 +48,7 @@ import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.HOSTNAME; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.PG_PORT; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SLOT_NAME; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES_EXCLUDE; @@ -97,6 +98,25 @@ public void testCreateDataSource() { .isEqualTo(Arrays.asList("inventory.products")); } + @Test + public void testScanNewlyAddedTableEnabled() { + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost()); + options.put( + PG_PORT.key(), String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), POSTGRES_CONTAINER.getDatabaseName() + ".inventory.prod\\.*"); + options.put(SLOT_NAME.key(), slotName); + options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); + + Factory.Context context = new MockContext(Configuration.fromMap(options)); + PostgresDataSourceFactory factory = new PostgresDataSourceFactory(); + PostgresDataSource dataSource = (PostgresDataSource) factory.createDataSource(context); + + assertThat(dataSource.getPostgresSourceConfig().isScanNewlyAddedTableEnabled()).isTrue(); + } + @Test public void testNoMatchedTable() { Map options = new HashMap<>();