Skip to content

Commit cc533e8

Browse files
author
Kane
committed
Fix: Adjust VARCHAR byte length calculation to fully support UTF-8 (utf8mb4) (#620)
The current logic in `TypeConverter.java` uses a multiplier of `3` to calculate the required byte length for the Doris `VARCHAR` type: ```java // Current implementation return length * 3 > 65533 ? DorisType.STRING : String.format("%s(%s)", DorisType.VARCHAR, length * 3); ``` This assumes a maximum of 3 bytes per character, which is insufficient for the widely used utf8mb4 character set (common in MySQL/MariaDB and other sources). The utf8mb4 encoding supports the full range of Unicode characters (including emojis), requiring up to 4 bytes per character. If a source column contains 4-byte characters, the calculated byte length may underestimate the required size, leading to: Data truncation or corruption during the synchronization process. Load failures with errors such as "data length exceeded" or "row size too large" when Doris enforces the byte limit. Proposed Solution This change updates the byte multiplier from 3 to 4 to safely accommodate the full utf8mb4 character set, ensuring the calculated byte length is always sufficient for the defined character length, thus guaranteeing data integrity and preventing sync failures.
1 parent 3e6e0ab commit cc533e8

13 files changed

Lines changed: 37 additions & 37 deletions

File tree

flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private static class LogicalTypeVisitor extends LogicalTypeDefaultVisitor<String
155155

156156
@Override
157157
public String visit(CharType charType) {
158-
long length = charType.getLength() * 3L;
158+
long length = charType.getLength() * 4L;
159159
if (length <= MAX_CHAR_SIZE) {
160160
return String.format("%s(%s)", DorisType.CHAR, length);
161161
} else {
@@ -166,7 +166,7 @@ public String visit(CharType charType) {
166166
@Override
167167
public String visit(VarCharType varCharType) {
168168
// Flink varchar length max value is int, it may overflow after multiplying by 3
169-
long length = varCharType.getLength() * 3L;
169+
long length = varCharType.getLength() * 4L;
170170
return length >= MAX_VARCHAR_SIZE ? STRING : String.format("%s(%s)", VARCHAR, length);
171171
}
172172

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,12 @@ private String handleType(String type) {
148148
if (type == null || "".equals(type)) {
149149
return "";
150150
}
151-
// varchar len * 3
151+
// varchar len * 4
152152
Pattern pattern = Pattern.compile("varchar\\(([1-9][0-9]*)\\)", Pattern.CASE_INSENSITIVE);
153153
Matcher matcher = pattern.matcher(type);
154154
if (matcher.find()) {
155155
String len = matcher.group(1);
156-
return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 3, 65533));
156+
return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 4, 65533));
157157
}
158158

159159
return type;

flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ public static String toDorisType(String db2Type, Integer precision, Integer scal
7676
case VARCHAR:
7777
case LONG_VARCHAR:
7878
Preconditions.checkNotNull(precision);
79-
return precision * 3 > 65533
79+
return precision * 4 > 65533
8080
? DorisType.STRING
81-
: String.format("%s(%s)", DorisType.VARCHAR, precision * 3);
81+
: String.format("%s(%s)", DorisType.VARCHAR, precision * 4);
8282
case TIMESTAMP:
8383
return String.format(
8484
"%s(%s)", DorisType.DATETIME_V2, Math.min(scale == null ? 0 : scale, 6));

flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,9 @@ public static String toDorisType(String type, Integer length, Integer scale) {
192192
case CHAR:
193193
case VARCHAR:
194194
Preconditions.checkNotNull(length);
195-
return length * 3 > 65533
195+
return length * 4 > 65533
196196
? DorisType.STRING
197-
: String.format("%s(%s)", DorisType.VARCHAR, length * 3);
197+
: String.format("%s(%s)", DorisType.VARCHAR, length * 4);
198198
case TINYTEXT:
199199
case TEXT:
200200
case MEDIUMTEXT:

flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,9 @@ public static String toDorisType(String oracleType, Integer precision, Integer s
9191
case NCHAR:
9292
case NVARCHAR2:
9393
Preconditions.checkNotNull(precision);
94-
return precision * 3 > 65533
94+
return precision * 4 > 65533
9595
? DorisType.STRING
96-
: String.format("%s(%s)", DorisType.VARCHAR, precision * 3);
96+
: String.format("%s(%s)", DorisType.VARCHAR, precision * 4);
9797
case LONG:
9898
case RAW:
9999
case LONG_RAW:

flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ public static String toDorisType(String postgresType, Integer precision, Integer
107107
case BPCHAR:
108108
case VARCHAR:
109109
Preconditions.checkNotNull(precision);
110-
return precision * 3 > 65533
110+
return precision * 4 > 65533
111111
? DorisType.STRING
112-
: String.format("%s(%s)", DorisType.VARCHAR, precision * 3);
112+
: String.format("%s(%s)", DorisType.VARCHAR, precision * 4);
113113
case POINT:
114114
case LINE:
115115
case LSEG:

flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,9 @@ public static String toDorisType(String originSqlServerType, Integer precision,
9494
case VARCHAR:
9595
case NCHAR:
9696
case NVARCHAR:
97-
return precision * 3 > 65533
97+
return precision * 4 > 65533
9898
? DorisType.STRING
99-
: String.format("%s(%s)", DorisType.VARCHAR, precision * 3);
99+
: String.format("%s(%s)", DorisType.VARCHAR, precision * 4);
100100
case TEXT:
101101
case NTEXT:
102102
case TIME:

flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ public void testCharType() {
109109
DataType result = DorisTypeMapper.toFlinkType("col", "CHAR", 10, 0);
110110
assertEquals(DataTypes.CHAR(10), result);
111111
String dorisType = DorisTypeMapper.toDorisType(DataTypes.CHAR(10));
112-
assertEquals("CHAR(30)", dorisType);
112+
assertEquals("CHAR(40)", dorisType);
113113
dorisType = DorisTypeMapper.toDorisType(DataTypes.CHAR(100));
114-
assertEquals("VARCHAR(300)", dorisType);
114+
assertEquals("VARCHAR(400)", dorisType);
115115
}
116116

117117
@Test

0 commit comments

Comments
 (0)