Skip to content

Commit 11d2770

Browse files
authored
Adding enum and range datatype support for postgres (#5458)
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com> Adding enum and range datatype support for postgres Adding enum and range datatype support for postgres Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com> Adding enum and range datatype support for postgres Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com> Adding enum and range datatype support for postgres Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com> Adding enum and range datatype support for postgres Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com> Adding enum and range datatype support for postgres
1 parent a004121 commit 11d2770

13 files changed

Lines changed: 522 additions & 23 deletions

File tree

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/PostgresStreamState.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111

1212
import com.fasterxml.jackson.annotation.JsonProperty;
1313

14+
import java.util.Map;
15+
import java.util.Set;
16+
1417
public class PostgresStreamState {
1518

1619
@JsonProperty("currentLsn")
@@ -22,6 +25,9 @@ public class PostgresStreamState {
2225
@JsonProperty("replicationSlotName")
2326
private String replicationSlotName;
2427

28+
@JsonProperty("enumColumnsByTable")
29+
private Map<String, Set<String>> enumColumnsByTable;
30+
2531
public String getCurrentLsn() {
2632
return currentLsn;
2733
}
@@ -45,4 +51,12 @@ public String getReplicationSlotName() {
4551
public void setReplicationSlotName(String replicationSlotName) {
4652
this.replicationSlotName = replicationSlotName;
4753
}
54+
55+
public Map<String, Set<String>> getEnumColumnsByTable() {
56+
return enumColumnsByTable;
57+
}
58+
59+
public void setEnumColumnsByTable(Map<String, Set<String>> enumColumnsByTable) {
60+
this.enumColumnsByTable = enumColumnsByTable;
61+
}
4862
}

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/ColumnType.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public enum ColumnType {
3232
INTERVAL(1186, "interval"),
3333
JSON(114, "json"),
3434
JSONB(3802, "jsonb"),
35+
JSONPATH(4072, "jsonpath"),
3536
MONEY(790,"money"),
3637
BIT(1560, "bit"),
3738
VARBIT(1562, "varbit"),
@@ -49,11 +50,19 @@ public enum ColumnType {
4950
XML(142, "xml"),
5051
UUID(2950, "uuid"),
5152
PG_LSN(3220, "pg_lsn"),
53+
PG_SNAPSHOT(5038, "pg_snapshot"),
54+
TXID_SNAPSHOT(2970, "txid_snapshot"),
5255
TSVECTOR(3614, "tsvector"),
5356
TSQUERY(3615, "tsquery"),
54-
BYTEA(17, "bytea");
55-
57+
BYTEA(17, "bytea"),
58+
INT4RANGE(3904, "int4range"),
59+
INT8RANGE(3926, "int8range"),
60+
TSRANGE(3908, "tsrange"),
61+
TSTZRANGE(3910, "tstzrange"),
62+
DATERANGE(3912, "daterange"),
63+
ENUM(-1,"enum");
5664

65+
public static final int ENUM_TYPE_ID = -1;
5766
private final int typeId;
5867
private final String typeName;
5968

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataType.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public enum PostgresDataType {
2020
TEXT("text", DataCategory.STRING),
2121
VARCHAR("varchar", DataCategory.STRING),
2222
BPCHAR("bpchar", DataCategory.STRING),
23+
ENUM("enum", DataCategory.STRING),
2324

2425
//Bit String Data type
2526
BIT("bit",DataCategory.BIT_STRING),
@@ -28,6 +29,7 @@ public enum PostgresDataType {
2829
//Json Data type
2930
JSON("json",DataCategory.JSON),
3031
JSONB("jsonb",DataCategory.JSON),
32+
JSONPATH("jsonpath", DataCategory.JSON),
3133

3234
//Boolean data type
3335
BOOLEAN("bool", DataCategory.BOOLEAN),
@@ -61,6 +63,14 @@ public enum PostgresDataType {
6163
PG_LSN("pg_lsn", DataCategory.SPECIAL),
6264
TSVECTOR("tsvector", DataCategory.SPECIAL),
6365
TSQUERY("tsquery", DataCategory.SPECIAL),
66+
PG_SNAPSHOT("pg_snapshot", DataCategory.SPECIAL),
67+
TXID_SNAPSHOT("txid_snapshot", DataCategory.SPECIAL),
68+
69+
INT4RANGE("int4range", DataCategory.RANGE),
70+
INT8RANGE("int8range", DataCategory.RANGE),
71+
TSRANGE("tsrange", DataCategory.RANGE),
72+
TSTZRANGE("tstzrange", DataCategory.RANGE),
73+
DATERANGE("daterange", DataCategory.RANGE),
6474

6575
//Binary data type
6676
BYTEA("bytea", DataCategory.BINARY);
@@ -109,7 +119,8 @@ public enum DataCategory {
109119
SPATIAL,
110120
NETWORK_ADDRESS,
111121
SPECIAL,
112-
BINARY
122+
BINARY,
123+
RANGE
113124
}
114125

115126

@@ -153,5 +164,10 @@ public boolean isSpecial() {
153164
public boolean isBinary() {
154165
return category == DataCategory.BINARY;
155166
}
167+
168+
public boolean isRange() {
169+
return category == DataCategory.RANGE;
170+
}
171+
156172
}
157173

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataTypeHelper.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.JsonTypeHandler;
77
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.NetworkAddressTypeHandler;
88
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.NumericTypeHandler;
9+
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.RangeTypeHandler;
910
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.SpatialTypeHandler;
1011
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.SpecialTypeHandler;
1112
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler.StringTypeHandler;
@@ -15,17 +16,21 @@
1516
import java.util.Map;
1617

1718
public class PostgresDataTypeHelper {
18-
private static final Map<PostgresDataType.DataCategory, PostgresDataTypeHandler> typeHandlers = Map.of(
19-
PostgresDataType.DataCategory.NUMERIC, new NumericTypeHandler(),
20-
PostgresDataType.DataCategory.STRING, new StringTypeHandler(),
21-
PostgresDataType.DataCategory.BIT_STRING, new BitStringTypeHandler(),
22-
PostgresDataType.DataCategory.JSON, new JsonTypeHandler(),
23-
PostgresDataType.DataCategory.BOOLEAN, new BooleanTypeHandler(),
24-
PostgresDataType.DataCategory.TEMPORAL, new TemporalTypeHandler(),
25-
PostgresDataType.DataCategory.SPATIAL, new SpatialTypeHandler(),
26-
PostgresDataType.DataCategory.NETWORK_ADDRESS, new NetworkAddressTypeHandler(),
27-
PostgresDataType.DataCategory.SPECIAL, new SpecialTypeHandler(),
28-
PostgresDataType.DataCategory.BINARY, new BinaryTypeHandler()
19+
private static final NumericTypeHandler numericTypeHandler = new NumericTypeHandler();
20+
private static final TemporalTypeHandler temporalTypeHandler = new TemporalTypeHandler();
21+
22+
private static final Map<PostgresDataType.DataCategory, PostgresDataTypeHandler> typeHandlers = Map.ofEntries(
23+
Map.entry(PostgresDataType.DataCategory.NUMERIC, numericTypeHandler),
24+
Map.entry(PostgresDataType.DataCategory.STRING, new StringTypeHandler()),
25+
Map.entry(PostgresDataType.DataCategory.BIT_STRING, new BitStringTypeHandler()),
26+
Map.entry(PostgresDataType.DataCategory.JSON, new JsonTypeHandler()),
27+
Map.entry(PostgresDataType.DataCategory.BOOLEAN, new BooleanTypeHandler()),
28+
Map.entry(PostgresDataType.DataCategory.TEMPORAL, temporalTypeHandler),
29+
Map.entry(PostgresDataType.DataCategory.SPATIAL, new SpatialTypeHandler()),
30+
Map.entry(PostgresDataType.DataCategory.NETWORK_ADDRESS, new NetworkAddressTypeHandler()),
31+
Map.entry(PostgresDataType.DataCategory.SPECIAL, new SpecialTypeHandler()),
32+
Map.entry(PostgresDataType.DataCategory.BINARY, new BinaryTypeHandler()),
33+
Map.entry(PostgresDataType.DataCategory.RANGE, new RangeTypeHandler(numericTypeHandler, temporalTypeHandler))
2934
);
3035

3136
public static Object getDataByColumnType(final PostgresDataType columnType, final String columnName, final Object value
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.handler;
2+
3+
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType;
4+
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler;
5+
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
9+
public class RangeTypeHandler implements PostgresDataTypeHandler {
10+
11+
public static final String EMPTY = "empty";
12+
public static final String GREATER_THAN = "gt";
13+
public static final String GREATER_THAN_OR_EQUAL_TO = "gte";
14+
public static final String LESSER_THAN = "lt";
15+
public static final String LESSER_THAN_OR_EQUAL_TO = "lte";
16+
17+
private final NumericTypeHandler numericTypeHandler;
18+
private final TemporalTypeHandler temporalTypeHandler;
19+
20+
public RangeTypeHandler(NumericTypeHandler numericTypeHandler, TemporalTypeHandler temporalTypeHandler) {
21+
this.numericTypeHandler = numericTypeHandler;
22+
this.temporalTypeHandler = temporalTypeHandler;
23+
}
24+
25+
@Override
26+
public Object handle(PostgresDataType columnType, String columnName, Object value) {
27+
if (!columnType.isRange()) {
28+
throw new IllegalArgumentException("ColumnType is not range: " + columnType);
29+
}
30+
31+
return parseRangeValue(columnType, columnName, value.toString());
32+
}
33+
34+
private Object parseRangeValue(PostgresDataType columnType,String columnName, String rangeString) {
35+
36+
if(rangeString.equals(EMPTY))
37+
return null;
38+
39+
String cleanRangeString = rangeString.substring(1, rangeString.length() - 1);
40+
String[] rangeValues = cleanRangeString.split(",",-1);
41+
42+
if (rangeValues.length == 2) {
43+
String lowerBound = trimQuotes(rangeValues[0]);
44+
String upperBound = trimQuotes(rangeValues[1]);
45+
String lowerBoundInclusivity = String.valueOf(rangeString.charAt(0));
46+
String upperBoundInclusivity = String.valueOf(rangeString.charAt(rangeString.length() - 1));
47+
switch (columnType) {
48+
case INT4RANGE:
49+
return handleNumericRange(PostgresDataType.INTEGER, columnName, lowerBoundInclusivity, lowerBound, upperBound, upperBoundInclusivity);
50+
case INT8RANGE:
51+
return handleNumericRange(PostgresDataType.BIGINT, columnName, lowerBoundInclusivity, lowerBound, upperBound, upperBoundInclusivity);
52+
case TSRANGE:
53+
return handleTemporalRange(PostgresDataType.TIMESTAMP, columnName, lowerBoundInclusivity, lowerBound, upperBound, upperBoundInclusivity);
54+
case TSTZRANGE:
55+
return handleTemporalRange(PostgresDataType.TIMESTAMPTZ, columnName, lowerBoundInclusivity, lowerBound, upperBound, upperBoundInclusivity);
56+
case DATERANGE:
57+
return handleTemporalRange(PostgresDataType.DATE, columnName, lowerBoundInclusivity, lowerBound, upperBound, upperBoundInclusivity);
58+
default:
59+
throw new IllegalArgumentException("Unsupported range type: " + columnType);
60+
}
61+
} else {
62+
throw new IllegalArgumentException("Invalid range format: " + rangeString);
63+
}
64+
65+
}
66+
private Map<String, Object> handleNumericRange(PostgresDataType columnType, String columnName, String lowerBoundInclusivity, String lowerBound, String upperBound, String upperBoundInclusivity) {
67+
Map<String, Object> rangeMap = new HashMap<>();
68+
69+
if(!lowerBound.isEmpty()) {
70+
Object parsedLowerBound = numericTypeHandler.handle(columnType, columnName, lowerBound);
71+
rangeMap.put(lowerBoundInclusivity.equals("[") ? GREATER_THAN_OR_EQUAL_TO : GREATER_THAN, parsedLowerBound);
72+
}
73+
if(!upperBound.isEmpty()) {
74+
Object parsedUpperBound = numericTypeHandler.handle(columnType, columnName, upperBound);
75+
rangeMap.put(upperBoundInclusivity.equals("]") ? LESSER_THAN_OR_EQUAL_TO : LESSER_THAN, parsedUpperBound);
76+
}
77+
return rangeMap;
78+
}
79+
80+
private Map<String, Object> handleTemporalRange(PostgresDataType columnType, String columnName, String lowerBoundInclusivity, String lowerBound, String upperBound, String upperBoundInclusivity) {
81+
Map<String, Object> rangeMap = new HashMap<>();
82+
83+
if(!lowerBound.isEmpty()) {
84+
Object parsedLowerBound = temporalTypeHandler.handle(columnType, columnName, lowerBound);
85+
rangeMap.put(lowerBoundInclusivity.equals("[") ? GREATER_THAN_OR_EQUAL_TO : GREATER_THAN, parsedLowerBound);
86+
}
87+
88+
if (!upperBound.isEmpty()) {
89+
Object parsedUpperBound = temporalTypeHandler.handle(columnType, columnName, upperBound);
90+
rangeMap.put(upperBoundInclusivity.equals("]") ? LESSER_THAN_OR_EQUAL_TO : LESSER_THAN, parsedUpperBound);
91+
}
92+
return rangeMap;
93+
}
94+
95+
96+
private String trimQuotes(String input) {
97+
if (input == null || input.isEmpty()) {
98+
return input;
99+
}
100+
101+
String trimmed = input;
102+
103+
if (trimmed.startsWith("\"") && trimmed.endsWith("\"")) {
104+
trimmed = trimmed.substring(1, trimmed.length() - 1);
105+
}
106+
return trimmed;
107+
}
108+
109+
}

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.List;
3131
import java.util.Map;
3232
import java.util.Optional;
33+
import java.util.Set;
3334
import java.util.UUID;
3435
import java.util.stream.Collectors;
3536

@@ -180,6 +181,14 @@ private Map<String, List<String>> getPrimaryKeyMap() {
180181
));
181182
}
182183

184+
private Map<String, Set<String>> getPostgresEnumColumnsByTable() {
185+
return sourceConfig.getTableNames().stream()
186+
.collect(Collectors.toMap(
187+
fullTableName -> fullTableName,
188+
fullTableName -> ((PostgresSchemaManager)schemaManager).getEnumColumns(fullTableName)
189+
));
190+
}
191+
183192
private void createStreamPartition(RdsSourceConfig sourceConfig) {
184193
final StreamProgressState progressState = new StreamProgressState();
185194
progressState.setEngineType(sourceConfig.getEngine().toString());
@@ -199,6 +208,7 @@ private void createStreamPartition(RdsSourceConfig sourceConfig) {
199208
final PostgresStreamState postgresStreamState = new PostgresStreamState();
200209
postgresStreamState.setPublicationName(publicationName);
201210
postgresStreamState.setReplicationSlotName(slotName);
211+
postgresStreamState.setEnumColumnsByTable(getPostgresEnumColumnsByTable());
202212
progressState.setPostgresStreamState(postgresStreamState);
203213
}
204214
streamPartition = new StreamPartition(sourceConfig.getDbIdentifier(), progressState);

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/MessageType.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ public enum MessageType {
1818
INSERT('I'),
1919
UPDATE('U'),
2020
DELETE('D'),
21-
COMMIT('C');
21+
COMMIT('C'),
22+
TYPE('Y');
2223

2324
private final char value;
2425

@@ -28,7 +29,8 @@ public enum MessageType {
2829
INSERT.getValue(), INSERT,
2930
UPDATE.getValue(), UPDATE,
3031
DELETE.getValue(), DELETE,
31-
COMMIT.getValue(), COMMIT
32+
COMMIT.getValue(), COMMIT,
33+
TYPE.getValue(), TYPE
3234
);
3335

3436
MessageType(char value) {

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public Map<String, String> getColumnDataTypes(final String fullTableName) {
9191
);
9292
}
9393
}
94+
return columnsToDataType;
9495
} catch (final Exception e) {
9596
LOG.error("Failed to get dataTypes for database {} table {}, retrying", database, tableName, e);
9697
if (retry == NUM_OF_RETRIES) {

0 commit comments

Comments
 (0)