Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ protected void executeDataSnapshot(Context context) throws Exception {

PostgresSnapshotSplitReadTask snapshotSplitReadTask =
new PostgresSnapshotSplitReadTask(
(PostgresSourceConfig) ctx.getSourceConfig(),
ctx.getConnection(),
ctx.getDbzConnectorConfig(),
ctx.getDatabaseSchema(),
Expand Down Expand Up @@ -214,7 +215,7 @@ public static class PostgresSnapshotSplitReadTask
LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class);

private final PostgresConnection jdbcConnection;
private final PostgresConnectorConfig connectorConfig;
private final PostgresSourceConfig sourceConfig;
private final PostgresEventDispatcher<TableId> eventDispatcher;
private final SnapshotSplit snapshotSplit;
private final PostgresOffsetContext offsetContext;
Expand All @@ -223,6 +224,7 @@ public static class PostgresSnapshotSplitReadTask
private final Clock clock;

public PostgresSnapshotSplitReadTask(
PostgresSourceConfig sourceConfig,
PostgresConnection jdbcConnection,
PostgresConnectorConfig connectorConfig,
PostgresSchema databaseSchema,
Expand All @@ -232,7 +234,7 @@ public PostgresSnapshotSplitReadTask(
SnapshotSplit snapshotSplit) {
super(connectorConfig, snapshotProgressListener);
this.jdbcConnection = jdbcConnection;
this.connectorConfig = connectorConfig;
this.sourceConfig = sourceConfig;
this.snapshotProgressListener = snapshotProgressListener;
this.databaseSchema = databaseSchema;
this.eventDispatcher = eventDispatcher;
Expand Down Expand Up @@ -314,7 +316,7 @@ private void createDataEventsForTable(
snapshotSplit.getSplitStart(),
snapshotSplit.getSplitEnd(),
snapshotSplit.getSplitKeyType().getFieldCount(),
connectorConfig.getSnapshotFetchSize());
sourceConfig.getFetchSize());
ResultSet rs = selectStatement.executeQuery()) {

ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -300,14 +299,10 @@ void testSnapshotFetchSize() throws Exception {
PostgresSourceConfigFactory sourceConfigFactory =
getMockPostgresSourceConfigFactory(
customDatabase, schemaName, tableName, null, 10, true);
Properties properties = new Properties();
properties.setProperty("snapshot.fetch.size", "2");
sourceConfigFactory.debeziumProperties(properties);
sourceConfigFactory.fetchSize(2);
PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
PostgresDialect postgresDialect = new PostgresDialect(sourceConfigFactory.create(0));
SnapshotSplit snapshotSplit = getSnapshotSplits(sourceConfig, postgresDialect).get(0);
PostgresSourceFetchTaskContext postgresSourceFetchTaskContext =
new PostgresSourceFetchTaskContext(sourceConfig, postgresDialect);
final String selectSql =
PostgresQueryUtils.buildSplitScanQuery(
snapshotSplit.getTableId(),
Expand All @@ -326,9 +321,7 @@ void testSnapshotFetchSize() throws Exception {
snapshotSplit.getSplitStart(),
snapshotSplit.getSplitEnd(),
snapshotSplit.getSplitKeyType().getFieldCount(),
postgresSourceFetchTaskContext
.getDbzConnectorConfig()
.getSnapshotFetchSize());
sourceConfig.getFetchSize());
ResultSet rs = selectStatement.executeQuery()) {
assertThat(rs.getFetchSize()).isEqualTo(2);
}
Expand Down
Loading