Skip to content

Commit 425bdfd

Browse files
authored
[Optimization-4481][metadata] Unify the data types of metadata fields (#4482)
1 parent e355e2c commit 425bdfd

74 files changed

Lines changed: 4475 additions & 1274 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/convert/DataTypeConverter.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,24 +61,18 @@ public class DataTypeConverter {
6161
public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000
6262

6363
public static LogicalType getLogicalType(Column column) {
64-
switch (column.getJavaType()) {
64+
switch (column.getDataType().getValue()) {
6565
case BOOLEAN:
66-
case JAVA_LANG_BOOLEAN:
6766
return new BooleanType();
68-
case BYTE:
69-
case JAVA_LANG_BYTE:
67+
case TINYINT:
7068
return new TinyIntType();
71-
case SHORT:
72-
case JAVA_LANG_SHORT:
69+
case SMALLINT:
7370
return new SmallIntType();
74-
case LONG:
75-
case JAVA_LANG_LONG:
71+
case BIGINT:
7672
return new BigIntType();
7773
case FLOAT:
78-
case JAVA_LANG_FLOAT:
7974
return new FloatType();
8075
case DOUBLE:
81-
case JAVA_LANG_DOUBLE:
8276
return new DoubleType();
8377
case DECIMAL:
8478
if (column.getPrecision() == null || column.getPrecision() == 0) {
@@ -87,19 +81,15 @@ public static LogicalType getLogicalType(Column column) {
8781
return new DecimalType(column.getPrecision(), column.getScale());
8882
}
8983
case INT:
90-
case INTEGER:
9184
return new IntType();
9285
case TIME:
93-
case LOCALTIME:
9486
return new TimeType(
9587
column.isNullable(),
9688
column.getLength() == 0
9789
? (column.getPrecision() == null ? 0 : column.getPrecision())
9890
: column.getLength());
9991
case DATE:
100-
case LOCAL_DATE:
10192
return new DateType();
102-
case LOCAL_DATETIME:
10393
case TIMESTAMP:
10494
if (Asserts.isNotNull(column.getLength())) {
10595
return new TimestampType(column.getLength());

dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/utils/FlinkStatementUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ public static String getFlinkDDL(
8080
sb.append("` (\n");
8181
List<String> pks = new ArrayList<>();
8282
for (int i = 0; i < table.getColumns().size(); i++) {
83-
String type = table.getColumns().get(i).getFlinkType();
83+
String type =
84+
table.getColumns().get(i).getDataType().getLogicalType().asSummaryString();
8485
sb.append(" ");
8586
if (i > 0) {
8687
sb.append(",");

dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkTableMetadataUtil.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.dinky.utils;
2121

22-
import org.dinky.data.enums.ColumnType;
2322
import org.dinky.data.model.Catalog;
2423
import org.dinky.data.model.Column;
2524
import org.dinky.data.model.Schema;
@@ -124,10 +123,7 @@ public static List<Column> getColumnList(
124123
column.setScale(((DecimalType) logicalType).getScale());
125124
}
126125

127-
String dataTypeName =
128-
flinkColumn.getDataType().getConversionClass().getName();
129-
ColumnType columnType = ColumnType.getByJavaType(dataTypeName);
130-
column.setJavaType(columnType);
126+
column.buildDataType();
131127
columns.add(column);
132128
}
133129
});

dinky-common/src/main/java/org/dinky/assertion/Asserts.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,18 @@ public static void checkNull(Object key, String msg) {
9393
}
9494
}
9595

96-
public static void checkNotNull(Object object, String msg) {
97-
if (isNull(object)) {
96+
public static <T> T checkNotNull(T reference) {
97+
if (isNull(reference)) {
98+
throw new NullPointerException();
99+
}
100+
return reference;
101+
}
102+
103+
public static <T> T checkNotNull(T reference, String msg) {
104+
if (isNull(reference)) {
98105
throw new BusException(msg);
99106
}
107+
return reference;
100108
}
101109

102110
public static void checkNullString(String key, String msg) {
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one or more
4+
* contributor license agreements. See the NOTICE file distributed with
5+
* this work for additional information regarding copyright ownership.
6+
* The ASF licenses this file to You under the Apache License, Version 2.0
7+
* (the "License"); you may not use this file except in compliance with
8+
* the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*/
19+
20+
package org.dinky.data.exception;
21+
22+
public class ValidationException extends RuntimeException {
23+
24+
public ValidationException(String message, Throwable cause) {
25+
super(message, cause);
26+
}
27+
28+
public ValidationException(String message) {
29+
super(message);
30+
}
31+
}

dinky-common/src/main/java/org/dinky/data/model/Column.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
package org.dinky.data.model;
2121

22-
import org.dinky.data.enums.ColumnType;
22+
import org.dinky.data.types.ColumnType;
23+
import org.dinky.data.types.DataTypes;
24+
import org.dinky.data.types.LogicalTypeParam;
2325

2426
import java.io.Serializable;
2527

@@ -51,7 +53,7 @@ public class Column implements Serializable {
5153
private boolean autoIncrement;
5254
private String defaultValue;
5355
private boolean isNullable;
54-
private ColumnType javaType;
56+
private ColumnType dataType;
5557
private String columnFamily;
5658
private Integer position;
5759
private Integer length;
@@ -60,17 +62,10 @@ public class Column implements Serializable {
6062
private String characterSet;
6163
private String collation;
6264

63-
public String getFlinkType() {
64-
String flinkType = javaType.getFlinkType();
65-
if (!flinkType.equals("DECIMAL")) {
66-
return flinkType;
67-
}
68-
69-
Integer defaultPrecision = precision;
70-
if (precision == null || precision == 0) {
71-
defaultPrecision = 38;
72-
}
73-
74-
return String.format("%s(%d,%d)", flinkType, defaultPrecision, scale);
65+
public ColumnType buildDataType() {
66+
final LogicalTypeParam logicalTypeParam = LogicalTypeParam.of(isNullable, length, precision, scale);
67+
final DataTypes dataTypes = DataTypes.of(type);
68+
dataType = ColumnType.of(dataTypes, dataTypes.copyLogicalType(logicalTypeParam));
69+
return dataType;
7570
}
7671
}

dinky-common/src/main/java/org/dinky/data/model/Table.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,11 @@ public String getFlinkDDL(String flinkConfig, String tableName) {
128128
comment = String.format(
129129
" COMMENT '%s'", column.getComment().replaceAll("[\"']", ""));
130130
}
131-
return String.format(" `%s` %s%s", column.getName(), column.getFlinkType(), comment);
131+
return String.format(
132+
" `%s` %s %s",
133+
column.getName(),
134+
column.getDataType().getLogicalType().asSummaryString(),
135+
comment);
132136
})
133137
.collect(Collectors.joining(",\n"));
134138

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one or more
4+
* contributor license agreements. See the NOTICE file distributed with
5+
* this work for additional information regarding copyright ownership.
6+
* The ASF licenses this file to You under the Apache License, Version 2.0
7+
* (the "License"); you may not use this file except in compliance with
8+
* the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*/
19+
20+
package org.dinky.data.types;
21+
22+
import org.dinky.assertion.Asserts;
23+
24+
import java.util.Collections;
25+
import java.util.List;
26+
import java.util.Objects;
27+
28+
public final class ArrayType extends LogicalType {
29+
30+
private static final long serialVersionUID = 1L;
31+
32+
public static final String FORMAT = "ARRAY<%s>";
33+
34+
private final LogicalType elementType;
35+
36+
public ArrayType(boolean isNullable, LogicalType elementType) {
37+
super(isNullable, LogicalTypeRoot.ARRAY);
38+
this.elementType = Asserts.checkNotNull(elementType, "Element type must not be null.");
39+
}
40+
41+
public ArrayType(LogicalType elementType) {
42+
this(true, elementType);
43+
}
44+
45+
public LogicalType getElementType() {
46+
return elementType;
47+
}
48+
49+
@Override
50+
public LogicalType copy(boolean isNullable) {
51+
return new ArrayType(isNullable, elementType.copy());
52+
}
53+
54+
@Override
55+
public LogicalType copy(LogicalTypeParam param) {
56+
if (param == null || param.getNullable() == null) {
57+
return copy();
58+
}
59+
return copy(param.getNullable());
60+
}
61+
62+
@Override
63+
public String asSummaryString() {
64+
return withNullability(FORMAT, elementType.asSummaryString());
65+
}
66+
67+
@Override
68+
public String asSerializableString() {
69+
return withNullability(FORMAT, elementType.asSerializableString());
70+
}
71+
72+
@Override
73+
public List<LogicalType> getChildren() {
74+
return Collections.singletonList(elementType);
75+
}
76+
77+
@Override
78+
public boolean equals(Object o) {
79+
if (this == o) {
80+
return true;
81+
}
82+
if (o == null || getClass() != o.getClass()) {
83+
return false;
84+
}
85+
if (!super.equals(o)) {
86+
return false;
87+
}
88+
ArrayType arrayType = (ArrayType) o;
89+
return elementType.equals(arrayType.elementType);
90+
}
91+
92+
@Override
93+
public int hashCode() {
94+
return Objects.hash(super.hashCode(), elementType);
95+
}
96+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one or more
4+
* contributor license agreements. See the NOTICE file distributed with
5+
* this work for additional information regarding copyright ownership.
6+
* The ASF licenses this file to You under the Apache License, Version 2.0
7+
* (the "License"); you may not use this file except in compliance with
8+
* the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*/
19+
20+
package org.dinky.data.types;
21+
22+
import java.util.Collections;
23+
import java.util.List;
24+
25+
public final class BigIntType extends LogicalType {
26+
private static final long serialVersionUID = 1L;
27+
28+
public static final int PRECISION = 19;
29+
30+
private static final String FORMAT = "BIGINT";
31+
32+
public BigIntType(boolean isNullable) {
33+
super(isNullable, LogicalTypeRoot.BIGINT);
34+
}
35+
36+
public BigIntType() {
37+
this(true);
38+
}
39+
40+
@Override
41+
public LogicalType copy(boolean isNullable) {
42+
return new BigIntType(isNullable);
43+
}
44+
45+
@Override
46+
public LogicalType copy(LogicalTypeParam param) {
47+
if (param == null || param.getNullable() == null) {
48+
return copy();
49+
}
50+
return copy(param.getNullable());
51+
}
52+
53+
@Override
54+
public String asSerializableString() {
55+
return withNullability(FORMAT);
56+
}
57+
58+
@Override
59+
public List<LogicalType> getChildren() {
60+
return Collections.emptyList();
61+
}
62+
}

0 commit comments

Comments
 (0)