Skip to content

Commit d24c3fa

Browse files
authored
Few updates to RDS source plugin (#5602)
* Remove step to clean up replication slot Signed-off-by: Hai Yan <oeyh@amazon.com> * Update table filter config Signed-off-by: Hai Yan <oeyh@amazon.com> * Fix query for mysql 8.4 Signed-off-by: Hai Yan <oeyh@amazon.com> --------- Signed-off-by: Hai Yan <oeyh@amazon.com>
1 parent 9a4628b commit d24c3fa

13 files changed

Lines changed: 232 additions & 169 deletions

File tree

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
1515
import org.opensearch.dataprepper.model.record.Record;
1616
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
17-
import org.opensearch.dataprepper.plugins.source.rds.configuration.TableFilterConfig;
1817
import org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler;
1918
import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler;
2019
import org.opensearch.dataprepper.plugins.source.rds.export.ExportTaskManager;
@@ -213,10 +212,9 @@ private DbTableMetadata getDbTableMetadata(final DbMetadata dbMetadata, final Sc
213212
}
214213

215214
private Map<String, Map<String, String>> getColumnDataTypeMap(final SchemaManager schemaManager) {
216-
TableFilterConfig tableFilterConfig = sourceConfig.getTables();
217-
Set<String> tableNames = schemaManager.getTableNames(tableFilterConfig.getDatabase());
218-
tableFilterConfig.applyTableFilter(tableNames);
219-
LOG.info("These tables will be include in processing: {}", tableNames);
215+
Set<String> tableNames = schemaManager.getTableNames(sourceConfig.getDatabase());
216+
sourceConfig.applyTableFilter(tableNames);
217+
LOG.info("These tables will be included in processing: {}", tableNames);
220218
return schemaManager.getColumnDataTypes(new ArrayList<>(tableNames));
221219
}
222220
}

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import jakarta.validation.constraints.Max;
1111
import jakarta.validation.constraints.Min;
1212
import jakarta.validation.constraints.NotBlank;
13+
import jakarta.validation.constraints.NotEmpty;
1314
import jakarta.validation.constraints.NotNull;
1415
import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig;
1516
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
@@ -19,6 +20,9 @@
1920
import software.amazon.awssdk.regions.Region;
2021

2122
import java.time.Duration;
23+
import java.util.List;
24+
import java.util.Set;
25+
import java.util.stream.Collectors;
2226

2327
/**
2428
* Configuration for RDS Source
@@ -43,6 +47,10 @@ public class RdsSourceConfig {
4347
@NotNull
4448
private EngineType engine;
4549

50+
@JsonProperty("database")
51+
@NotEmpty
52+
private String database;
53+
4654
@JsonProperty("tables")
4755
private TableFilterConfig tableFilterConfig;
4856

@@ -111,6 +119,10 @@ public boolean isAurora() {
111119
return engine.isAurora();
112120
}
113121

122+
public String getDatabase() {
123+
return database;
124+
}
125+
114126
public TableFilterConfig getTables() {
115127
return tableFilterConfig;
116128
}
@@ -190,4 +202,29 @@ public String getPassword() {
190202
return password;
191203
}
192204
}
205+
206+
/**
207+
* This method applies the table filter configuration to the given set of table names.
208+
*
209+
* @param tableNames The set of table names to be filtered
210+
*/
211+
public void applyTableFilter(Set<String> tableNames) {
212+
if (tableFilterConfig == null) {
213+
return;
214+
}
215+
216+
if (!tableFilterConfig.getInclude().isEmpty()) {
217+
List<String> includeTableList = tableFilterConfig.getInclude().stream()
218+
.map(item -> getDatabase() + "." + item)
219+
.collect(Collectors.toList());
220+
tableNames.retainAll(includeTableList);
221+
}
222+
223+
if (!tableFilterConfig.getExclude().isEmpty()) {
224+
List<String> excludeTableList = tableFilterConfig.getExclude().stream()
225+
.map(item -> getDatabase() + "." + item)
226+
.collect(Collectors.toList());
227+
excludeTableList.forEach(tableNames::remove);
228+
}
229+
}
193230
}

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/TableFilterConfig.java

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,48 +10,20 @@
1010
package org.opensearch.dataprepper.plugins.source.rds.configuration;
1111

1212
import com.fasterxml.jackson.annotation.JsonProperty;
13-
import jakarta.validation.constraints.NotEmpty;
1413
import jakarta.validation.constraints.Size;
1514
import lombok.Getter;
1615

1716
import java.util.Collections;
1817
import java.util.List;
19-
import java.util.Set;
20-
import java.util.stream.Collectors;
2118

2219
@Getter
2320
public class TableFilterConfig {
2421

25-
@JsonProperty("database")
26-
@NotEmpty
27-
private String database;
28-
2922
@JsonProperty("include")
3023
@Size(max = 1000, message = "Table filter list should not be more than 1000")
3124
private List<String> include = Collections.emptyList();
3225

3326
@JsonProperty("exclude")
3427
@Size(max = 1000, message = "Table filter list should not be more than 1000")
3528
private List<String> exclude = Collections.emptyList();
36-
37-
/**
38-
* This method applies the table filter configuration to the given set of table names.
39-
*
40-
* @param tableNames The set of table names to be filtered
41-
*/
42-
public void applyTableFilter(Set<String> tableNames) {
43-
if (!getInclude().isEmpty()) {
44-
List<String> includeTableList = getInclude().stream()
45-
.map(item -> getDatabase() + "." + item)
46-
.collect(Collectors.toList());
47-
tableNames.retainAll(includeTableList);
48-
}
49-
50-
if (!getExclude().isEmpty()) {
51-
List<String> excludeTableList = getExclude().stream()
52-
.map(item -> getDatabase() + "." + item)
53-
.collect(Collectors.toList());
54-
excludeTableList.forEach(tableNames::remove);
55-
}
56-
}
5729
}

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
99
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
1010
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
11-
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
1211
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition;
1312
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState;
1413
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition;
@@ -116,21 +115,6 @@ public void run() {
116115

117116
public void shutdown() {
118117
shutdownRequested = true;
119-
120-
// Clean up publication and replication slot for Postgres
121-
if (streamPartition != null) {
122-
streamPartition.getProgressState().ifPresent(progressState -> {
123-
if (EngineType.fromString(progressState.getEngineType()).isPostgres()) {
124-
final PostgresStreamState postgresStreamState = progressState.getPostgresStreamState();
125-
final String publicationName = postgresStreamState.getPublicationName();
126-
final String replicationSlotName = postgresStreamState.getReplicationSlotName();
127-
LOG.info("Cleaned up logical replication slot {} and publication {}",
128-
replicationSlotName, publicationName);
129-
((PostgresSchemaManager) schemaManager).deleteLogicalReplicationSlot(
130-
publicationName, replicationSlotName);
131-
}
132-
});
133-
}
134118
}
135119

136120
private void init() {

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/ConnectionManagerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,6 @@ public ConnectionManager getConnectionManager() {
3838
sourceConfig.getAuthenticationConfig().getUsername(),
3939
sourceConfig.getAuthenticationConfig().getPassword(),
4040
sourceConfig.isTlsEnabled(),
41-
sourceConfig.getTables().getDatabase());
41+
sourceConfig.getDatabase());
4242
}
4343
}

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManager.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ public class MySqlSchemaManager implements SchemaManager {
3636
static final String[] TABLE_TYPES = new String[]{"TABLE"};
3737
static final String COLUMN_NAME = "COLUMN_NAME";
3838
static final String TABLE_NAME = "TABLE_NAME";
39+
static final String MYSQL_VERSION_8_4 = "8.4";
3940
static final String BINLOG_STATUS_QUERY = "SHOW MASTER STATUS";
41+
static final String NEW_BINLOG_STATUS_QUERY = "SHOW BINARY LOG STATUS";
4042
static final String BINLOG_FILE = "File";
4143
static final String BINLOG_POSITION = "Position";
4244
static final int NUM_OF_RETRIES = 3;
@@ -153,8 +155,11 @@ public Optional<BinlogCoordinate> getCurrentBinaryLogPosition() {
153155
int retry = 0;
154156
while (retry <= NUM_OF_RETRIES) {
155157
try (final Connection connection = connectionManager.getConnection()) {
158+
final String mySqlVersion = connection.getMetaData().getDatabaseProductVersion();
156159
final Statement statement = connection.createStatement();
157-
final ResultSet rs = statement.executeQuery(BINLOG_STATUS_QUERY);
160+
final ResultSet rs = VersionUtil.compareVersions(mySqlVersion, MYSQL_VERSION_8_4) >= 0 ?
161+
statement.executeQuery(NEW_BINLOG_STATUS_QUERY) :
162+
statement.executeQuery(BINLOG_STATUS_QUERY);
158163
if (rs.next()) {
159164
return Optional.of(new BinlogCoordinate(rs.getString(BINLOG_FILE), rs.getLong(BINLOG_POSITION)));
160165
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.source.rds.schema;
11+
12+
public class VersionUtil {
13+
/* Compares two version strings.
14+
* Returns -1 if version1 is less than version2, 0 if they are equal, and 1 if version1 is greater than version2.
15+
*/
16+
public static int compareVersions(String version1, String version2) {
17+
String[] v1Parts = version1.split("\\.");
18+
String[] v2Parts = version2.split("\\.");
19+
20+
int maxLength = Math.max(v1Parts.length, v2Parts.length);
21+
22+
for (int i = 0; i < maxLength; i++) {
23+
int v1Part = i < v1Parts.length ? Integer.parseInt(v1Parts[i]) : 0;
24+
int v2Part = i < v2Parts.length ? Integer.parseInt(v2Parts[i]) : 0;
25+
26+
if (v1Part < v2Part) {
27+
return -1;
28+
}
29+
if (v1Part > v2Part) {
30+
return 1;
31+
}
32+
}
33+
return 0;
34+
}
35+
}

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ void processBeginMessage(ByteBuffer msg) {
226226

227227
void processRelationMessage(ByteBuffer msg) {
228228
final long tableId = msg.getInt();
229-
String databaseName = sourceConfig.getTables().getDatabase();
229+
String databaseName = sourceConfig.getDatabase();
230230
String schemaName = getNullTerminatedString(msg);
231231
String tableName = getNullTerminatedString(msg);
232232

data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.opensearch.dataprepper.model.record.Record;
2424
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
2525
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
26-
import org.opensearch.dataprepper.plugins.source.rds.configuration.TableFilterConfig;
2726
import org.opensearch.dataprepper.plugins.source.rds.configuration.TlsConfig;
2827
import org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler;
2928
import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler;
@@ -196,13 +195,11 @@ private void prepareMocks() {
196195
.build())
197196
.build())
198197
.build();
199-
final TableFilterConfig tableFilterConfig = mock(TableFilterConfig.class);
200198
final String databaseName = UUID.randomUUID().toString();
201199
final Set<String> tableNames = Set.of("database1.table1", "database2.table2");
202200

203201
when(sourceConfig.getDbIdentifier()).thenReturn(dbIdentifier);
204-
when(sourceConfig.getTables()).thenReturn(tableFilterConfig);
205-
when(tableFilterConfig.getDatabase()).thenReturn(databaseName);
202+
when(sourceConfig.getDatabase()).thenReturn(databaseName);
206203
when(schemaManager.getTableNames(databaseName)).thenReturn(tableNames);
207204
when(schemaManager.getColumnDataTypes(new ArrayList<>(tableNames))).thenReturn(mock(Map.class));
208205
when(rdsClient.describeDBInstances(any(DescribeDbInstancesRequest.class))).thenReturn(describeDbInstancesResponse);

0 commit comments

Comments
 (0)