Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCHEMA_CHANGE_ENABLED;
Expand Down Expand Up @@ -133,6 +134,7 @@ public DataSource createDataSource(Context context) {
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);
boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
Comment thread
cansakiroglu marked this conversation as resolved.

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -175,6 +177,7 @@ public DataSource createDataSource(Context context) {
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.includeDatabaseInTableId(tableIdIncludeDatabase)
.includeSchemaChanges(includeSchemaChanges)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.getConfigFactory();

List<TableId> tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null);
Expand Down Expand Up @@ -266,6 +269,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(TABLE_ID_INCLUDE_DATABASE);
options.add(SCHEMA_CHANGE_ENABLED);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,16 @@ public class PostgresDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to infer CDC column types when processing pgoutput Relation messages.");

public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
ConfigOptions.key("scan.newly-added-table.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to scan the newly added tables or not. Defaults to false. "
+ "This option only takes effect when restoring from a savepoint or checkpoint, "
+ "and enables the existing SnapshotSplitAssigner#captureNewlyAddedTables() code path "
+ "to discover tables that match the source `tables:` pattern but were not part of "
+ "the captured set at savepoint time. Mirrors the MySQL Pipeline connector option "
+ "of the same name.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.PG_PORT;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SLOT_NAME;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES_EXCLUDE;
Expand Down Expand Up @@ -97,6 +98,25 @@ public void testCreateDataSource() {
.isEqualTo(Arrays.asList("inventory.products"));
}

@Test
public void testScanNewlyAddedTableEnabled() {
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost());
options.put(
PG_PORT.key(), String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(TABLES.key(), POSTGRES_CONTAINER.getDatabaseName() + ".inventory.prod\\.*");
options.put(SLOT_NAME.key(), slotName);
options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");

Factory.Context context = new MockContext(Configuration.fromMap(options));
PostgresDataSourceFactory factory = new PostgresDataSourceFactory();
PostgresDataSource dataSource = (PostgresDataSource) factory.createDataSource(context);

assertThat(dataSource.getPostgresSourceConfig().isScanNewlyAddedTableEnabled()).isTrue();
}

@Test
public void testNoMatchedTable() {
Map<String, String> options = new HashMap<>();
Expand Down