diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java index 1915aca809c..d2d47d709fd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java @@ -98,6 +98,7 @@ protected void executeDataSnapshot(Context context) throws Exception { PostgresSnapshotSplitReadTask snapshotSplitReadTask = new PostgresSnapshotSplitReadTask( + (PostgresSourceConfig) ctx.getSourceConfig(), ctx.getConnection(), ctx.getDbzConnectorConfig(), ctx.getDatabaseSchema(), @@ -214,7 +215,7 @@ public static class PostgresSnapshotSplitReadTask LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class); private final PostgresConnection jdbcConnection; - private final PostgresConnectorConfig connectorConfig; + private final PostgresSourceConfig sourceConfig; private final PostgresEventDispatcher eventDispatcher; private final SnapshotSplit snapshotSplit; private final PostgresOffsetContext offsetContext; @@ -223,6 +224,7 @@ public static class PostgresSnapshotSplitReadTask private final Clock clock; public PostgresSnapshotSplitReadTask( + PostgresSourceConfig sourceConfig, PostgresConnection jdbcConnection, PostgresConnectorConfig connectorConfig, PostgresSchema databaseSchema, @@ -232,7 +234,7 @@ public PostgresSnapshotSplitReadTask( SnapshotSplit snapshotSplit) { super(connectorConfig, snapshotProgressListener); this.jdbcConnection = jdbcConnection; - this.connectorConfig = connectorConfig; + this.sourceConfig = sourceConfig; this.snapshotProgressListener = snapshotProgressListener; this.databaseSchema = databaseSchema; this.eventDispatcher = eventDispatcher; @@ -314,7 +316,7 @@ private void createDataEventsForTable( snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd(), snapshotSplit.getSplitKeyType().getFieldCount(), - connectorConfig.getSnapshotFetchSize()); + sourceConfig.getFetchSize()); ResultSet rs = selectStatement.executeQuery()) { ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java index 8406b98bbba..eb9834b1744 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java @@ -55,7 +55,6 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.Properties; import static org.assertj.core.api.Assertions.assertThat; @@ -300,14 +299,10 @@ void testSnapshotFetchSize() throws Exception { PostgresSourceConfigFactory sourceConfigFactory = getMockPostgresSourceConfigFactory( customDatabase, schemaName, tableName, null, 10, true); - Properties properties = new Properties(); - properties.setProperty("snapshot.fetch.size", "2"); - sourceConfigFactory.debeziumProperties(properties); + sourceConfigFactory.fetchSize(2); PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0); PostgresDialect postgresDialect = new PostgresDialect(sourceConfigFactory.create(0)); SnapshotSplit snapshotSplit = getSnapshotSplits(sourceConfig, postgresDialect).get(0); - PostgresSourceFetchTaskContext postgresSourceFetchTaskContext = - new PostgresSourceFetchTaskContext(sourceConfig, postgresDialect); final String selectSql = PostgresQueryUtils.buildSplitScanQuery( snapshotSplit.getTableId(), @@ -326,9 +321,7 @@ void testSnapshotFetchSize() throws Exception { snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd(), snapshotSplit.getSplitKeyType().getFieldCount(), - postgresSourceFetchTaskContext - .getDbzConnectorConfig() - .getSnapshotFetchSize()); + sourceConfig.getFetchSize()); ResultSet rs = selectStatement.executeQuery()) { assertThat(rs.getFetchSize()).isEqualTo(2); }