From 550b627363d3a154eade460cb75fab659dbf8a09 Mon Sep 17 00:00:00 2001 From: Kane Date: Tue, 18 Nov 2025 16:54:05 +0800 Subject: [PATCH] Fix: Adjust VARCHAR byte length calculation to fully support UTF-8 (utf8mb4) (#620) The current logic in `TypeConverter.java` uses a multiplier of `3` to calculate the required byte length for the Doris `VARCHAR` type: ```java // Current implementation return length * 3 > 65533 ? DorisType.STRING : String.format("%s(%s)", DorisType.VARCHAR, length * 3); ``` This assumes a maximum of 3 bytes per character, which is insufficient for the widely used utf8mb4 character set (common in MySQL/MariaDB and other sources). The utf8mb4 encoding supports the full range of Unicode characters (including emojis), requiring up to 4 bytes per character. If a source column contains 4-byte characters, the calculated byte length may underestimate the required size, leading to: Data truncation or corruption during the synchronization process. Load failures with errors such as "data length exceeded" or "row size too large" when Doris enforces the byte limit. Proposed Solution This change updates the byte multiplier from 3 to 4 to safely accommodate the full utf8mb4 character set, ensuring the calculated byte length is always sufficient for the defined character length, thus guaranteeing data integrity and preventing sync failures. --- .../doris/flink/catalog/DorisTypeMapper.java | 4 ++-- .../JsonDebeziumSchemaChangeImpl.java | 4 ++-- .../doris/flink/tools/cdc/db2/Db2Type.java | 4 ++-- .../doris/flink/tools/cdc/mysql/MysqlType.java | 4 ++-- .../flink/tools/cdc/oracle/OracleType.java | 4 ++-- .../flink/tools/cdc/postgres/PostgresType.java | 4 ++-- .../tools/cdc/sqlserver/SqlServerType.java | 4 ++-- .../flink/catalog/DorisTypeMapperTest.java | 6 +++--- .../schema/SQLParserSchemaManagerTest.java | 18 +++++++++--------- .../TestJsonDebeziumSchemaChangeImpl.java | 6 +++--- .../TestJsonDebeziumSchemaChangeImplV2.java | 14 +++++++------- .../TestSQLParserSchemaChange.java | 6 +++--- .../doris/flink/tools/cdc/db2/Db2TypeTest.java | 6 +++--- 13 files changed, 42 insertions(+), 42 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java index bfca184ec..899a84e05 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java @@ -155,7 +155,7 @@ private static class LogicalTypeVisitor extends LogicalTypeDefaultVisitor= MAX_VARCHAR_SIZE ? STRING : String.format("%s(%s)", VARCHAR, length); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java index 09f0f3a69..2bc8a981c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java @@ -148,12 +148,12 @@ private String handleType(String type) { if (type == null || "".equals(type)) { return ""; } - // varchar len * 3 + // varchar len * 4 Pattern pattern = Pattern.compile("varchar\\(([1-9][0-9]*)\\)", Pattern.CASE_INSENSITIVE); Matcher matcher = pattern.matcher(type); if (matcher.find()) { String len = matcher.group(1); - return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 3, 65533)); + return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 4, 65533)); } return type; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java index 1255d1e7c..fed1988e9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java @@ -76,9 +76,9 @@ public static String toDorisType(String db2Type, Integer precision, Integer scal case VARCHAR: case LONG_VARCHAR: Preconditions.checkNotNull(precision); - return precision * 3 > 65533 + return precision * 4 > 65533 ? DorisType.STRING - : String.format("%s(%s)", DorisType.VARCHAR, precision * 3); + : String.format("%s(%s)", DorisType.VARCHAR, precision * 4); case TIMESTAMP: return String.format( "%s(%s)", DorisType.DATETIME_V2, Math.min(scale == null ? 0 : scale, 6)); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java index 49afe3751..ebd3009f0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java @@ -192,9 +192,9 @@ public static String toDorisType(String type, Integer length, Integer scale) { case CHAR: case VARCHAR: Preconditions.checkNotNull(length); - return length * 3 > 65533 + return length * 4 > 65533 ? DorisType.STRING - : String.format("%s(%s)", DorisType.VARCHAR, length * 3); + : String.format("%s(%s)", DorisType.VARCHAR, length * 4); case TINYTEXT: case TEXT: case MEDIUMTEXT: diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java index 304f21fd0..be687ab93 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java @@ -91,9 +91,9 @@ public static String toDorisType(String oracleType, Integer precision, Integer s case NCHAR: case NVARCHAR2: Preconditions.checkNotNull(precision); - return precision * 3 > 65533 + return precision * 4 > 65533 ? DorisType.STRING - : String.format("%s(%s)", DorisType.VARCHAR, precision * 3); + : String.format("%s(%s)", DorisType.VARCHAR, precision * 4); case LONG: case RAW: case LONG_RAW: diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java index ddffb6d9f..9187b5229 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java @@ -107,9 +107,9 @@ public static String toDorisType(String postgresType, Integer precision, Integer case BPCHAR: case VARCHAR: Preconditions.checkNotNull(precision); - return precision * 3 > 65533 + return precision * 4 > 65533 ? DorisType.STRING - : String.format("%s(%s)", DorisType.VARCHAR, precision * 3); + : String.format("%s(%s)", DorisType.VARCHAR, precision * 4); case POINT: case LINE: case LSEG: diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java index ff37c06d5..c96eb5c4b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java @@ -94,9 +94,9 @@ public static String toDorisType(String originSqlServerType, Integer precision, case VARCHAR: case NCHAR: case NVARCHAR: - return precision * 3 > 65533 + return precision * 4 > 65533 ? DorisType.STRING - : String.format("%s(%s)", DorisType.VARCHAR, precision * 3); + : String.format("%s(%s)", DorisType.VARCHAR, precision * 4); case TEXT: case NTEXT: case TIME: diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java index 84cd367c8..5217e9b86 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java @@ -109,9 +109,9 @@ public void testCharType() { DataType result = DorisTypeMapper.toFlinkType("col", "CHAR", 10, 0); assertEquals(DataTypes.CHAR(10), result); String dorisType = DorisTypeMapper.toDorisType(DataTypes.CHAR(10)); - assertEquals("CHAR(30)", dorisType); + assertEquals("CHAR(40)", dorisType); dorisType = DorisTypeMapper.toDorisType(DataTypes.CHAR(100)); - assertEquals("VARCHAR(300)", dorisType); + assertEquals("VARCHAR(400)", dorisType); } @Test @@ -119,7 +119,7 @@ public void testVarcharType() { DataType result = DorisTypeMapper.toFlinkType("col", "VARCHAR", 50, 0); assertEquals(DataTypes.VARCHAR(50), result); String dorisType = DorisTypeMapper.toDorisType(DataTypes.VARCHAR(50)); - assertEquals("VARCHAR(150)", dorisType); + assertEquals("VARCHAR(200)", dorisType); } @Test diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java index c7b23c18e..77c8cb9df 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java @@ -68,8 +68,8 @@ public void testParserAlterDDLs() { @Test public void testParserAlterDDLsAdd() { List expectDDLs = new ArrayList<>(); - expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `phone_number` VARCHAR(60)"); - expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `address` VARCHAR(765)"); + expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `phone_number` VARCHAR(80)"); + expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `address` VARCHAR(1020)"); SourceConnector mysql = SourceConnector.ORACLE; String ddl = @@ -265,7 +265,7 @@ public void testParseCreateTableStatement() { SourceConnector.MYSQL, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', typeString='INT', defaultValue='10000', comment='id_test'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(300)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', typeString='INT', defaultValue='10000', comment='id_test'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(400)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -289,7 +289,7 @@ public void testParseCreateTableUnsignedStatement() { SourceConnector.MYSQL, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', typeString='BIGINT', defaultValue='10000', comment='id_test'}, `id2`=FieldSchema{name='`id2`', typeString='LARGEINT', defaultValue='10000', comment='id2_comment'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(300)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', typeString='BIGINT', defaultValue='10000', comment='id_test'}, `id2`=FieldSchema{name='`id2`', typeString='LARGEINT', defaultValue='10000', comment='id2_comment'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(400)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}"; System.out.println(tableSchema.toString()); Assert.assertEquals(expected, tableSchema.toString()); } @@ -304,7 +304,7 @@ public void testParseCreateUniqueTableStatement() { SourceConnector.MYSQL, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_uni_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(300)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_uni_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(400)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(400)', defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -323,7 +323,7 @@ public void testParseCreateDuplicateTableStatement() { SourceConnector.MYSQL, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_duptab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}}, keys=id, model=DUPLICATE, distributeKeys=id, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_duptab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(200)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(1020)', defaultValue='null', comment='null'}}, keys=id, model=DUPLICATE, distributeKeys=id, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -352,7 +352,7 @@ public void testParseOracleTableStatement() { SourceConnector.ORACLE, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', comment='null'}, phone_number=FieldSchema{name='phone_number', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, hire_date=FieldSchema{name='hire_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, job_id=FieldSchema{name='job_id', typeString='VARCHAR(30)', defaultValue='null', comment='null'}, salary=FieldSchema{name='salary', typeString='DECIMALV3(8,2)', defaultValue='null', comment='null'}, commission_pct=FieldSchema{name='commission_pct', typeString='DECIMALV3(2,2)', defaultValue='null', comment='null'}, manager_id=FieldSchema{name='manager_id', typeString='BIGINT', defaultValue='null', comment='null'}, department_id=FieldSchema{name='department_id', typeString='BIGINT', defaultValue='null', comment='null'}}, keys=employee_id, model=UNIQUE, distributeKeys=employee_id, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(200)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(200)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(400)', defaultValue='null', comment='null'}, phone_number=FieldSchema{name='phone_number', typeString='VARCHAR(80)', defaultValue='null', comment='null'}, hire_date=FieldSchema{name='hire_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, job_id=FieldSchema{name='job_id', typeString='VARCHAR(40)', defaultValue='null', comment='null'}, salary=FieldSchema{name='salary', typeString='DECIMALV3(8,2)', defaultValue='null', comment='null'}, commission_pct=FieldSchema{name='commission_pct', typeString='DECIMALV3(2,2)', defaultValue='null', comment='null'}, manager_id=FieldSchema{name='manager_id', typeString='BIGINT', defaultValue='null', comment='null'}, department_id=FieldSchema{name='department_id', typeString='BIGINT', defaultValue='null', comment='null'}}, keys=employee_id, model=UNIQUE, distributeKeys=employee_id, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -378,7 +378,7 @@ public void testParseOraclePrimaryTableStatement() { SourceConnector.ORACLE, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', comment='null'}, phone_number=FieldSchema{name='phone_number', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, hire_date=FieldSchema{name='hire_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, job_id=FieldSchema{name='job_id', typeString='VARCHAR(30)', defaultValue='null', comment='null'}, salary=FieldSchema{name='salary', typeString='DECIMALV3(8,2)', defaultValue='null', comment='null'}, commission_pct=FieldSchema{name='commission_pct', typeString='DECIMALV3(2,2)', defaultValue='null', comment='null'}, manager_id=FieldSchema{name='manager_id', typeString='BIGINT', defaultValue='null', comment='null'}, department_id=FieldSchema{name='department_id', typeString='BIGINT', defaultValue='null', comment='null'}}, keys=employee_id, model=UNIQUE, distributeKeys=employee_id, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(200)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(200)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(400)', defaultValue='null', comment='null'}, phone_number=FieldSchema{name='phone_number', typeString='VARCHAR(80)', defaultValue='null', comment='null'}, hire_date=FieldSchema{name='hire_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, job_id=FieldSchema{name='job_id', typeString='VARCHAR(40)', defaultValue='null', comment='null'}, salary=FieldSchema{name='salary', typeString='DECIMALV3(8,2)', defaultValue='null', comment='null'}, commission_pct=FieldSchema{name='commission_pct', typeString='DECIMALV3(2,2)', defaultValue='null', comment='null'}, manager_id=FieldSchema{name='manager_id', typeString='BIGINT', defaultValue='null', comment='null'}, department_id=FieldSchema{name='department_id', typeString='BIGINT', defaultValue='null', comment='null'}}, keys=employee_id, model=UNIQUE, distributeKeys=employee_id, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -405,7 +405,7 @@ public void testParseOracleDuplicateTableStatement() { dorisTable, new DorisTableConfig(new HashMap<>())); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={order_id=FieldSchema{name='order_id', typeString='BIGINT', defaultValue='null', comment='null'}, customer_id=FieldSchema{name='customer_id', typeString='BIGINT', defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, status=FieldSchema{name='status', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, total_amount=FieldSchema{name='total_amount', typeString='DECIMALV3(12,2)', defaultValue='null', comment='null'}, shipping_address=FieldSchema{name='shipping_address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}, delivery_date=FieldSchema{name='delivery_date', typeString='DATETIMEV2', defaultValue='null', comment='null'}}, keys=order_id, model=DUPLICATE, distributeKeys=order_id, properties={light_schema_change=true}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={order_id=FieldSchema{name='order_id', typeString='BIGINT', defaultValue='null', comment='null'}, customer_id=FieldSchema{name='customer_id', typeString='BIGINT', defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, status=FieldSchema{name='status', typeString='VARCHAR(80)', defaultValue='null', comment='null'}, total_amount=FieldSchema{name='total_amount', typeString='DECIMALV3(12,2)', defaultValue='null', comment='null'}, shipping_address=FieldSchema{name='shipping_address', typeString='VARCHAR(1020)', defaultValue='null', comment='null'}, delivery_date=FieldSchema{name='delivery_date', typeString='DATETIMEV2', defaultValue='null', comment='null'}}, keys=order_id, model=DUPLICATE, distributeKeys=order_id, properties={light_schema_change=true}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java index e66ecaab8..a7c053b6b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java @@ -70,7 +70,7 @@ public void setUp() { @Test public void testExtractDDL() throws IOException, IllegalArgumentException { - String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(600)"; + String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(800)"; String record = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}"; JsonNode recordRoot = objectMapper.readTree(record); @@ -93,8 +93,8 @@ public void testSchemaChangeMultiTable() throws Exception { "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}"; String ddl2 = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t2\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}"; - String exceptDDL1 = "ALTER TABLE doris.t1 add COLUMN c_1 varchar(600)"; - String exceptDDL2 = "ALTER TABLE doris.t2 add COLUMN c_1 varchar(600)"; + String exceptDDL1 = "ALTER TABLE doris.t1 add COLUMN c_1 varchar(800)"; + String exceptDDL2 = "ALTER TABLE doris.t2 add COLUMN c_1 varchar(800)"; Assert.assertEquals(exceptDDL1, schemaChange.extractDDL(objectMapper.readTree(ddl1))); Assert.assertEquals(exceptDDL2, schemaChange.extractDDL(objectMapper.readTree(ddl2))); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java index fc3f6ffbd..4ae4e280e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java @@ -199,7 +199,7 @@ public void testExtractDDLListRenameColumn() throws IOException { public void testFillOriginSchema() throws IOException { Map srcFiledSchemaMap = new LinkedHashMap<>(); srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null)); - srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)", null, null)); + srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(200)", null, null)); srcFiledSchemaMap.put( "test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null)); srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "100", null)); @@ -313,7 +313,7 @@ private Map> buildSrcFiledSchema() { Map> scrFiledSchema = new LinkedHashMap<>(); Map filedSchemaMap1 = new LinkedHashMap<>(); filedSchemaMap1.put("id", new FieldSchema("id", "INT", null, null)); - filedSchemaMap1.put("name", new FieldSchema("name", "VARCHAR(150)", null, null)); + filedSchemaMap1.put("name", new FieldSchema("name", "VARCHAR(200)", null, null)); filedSchemaMap1.put("test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null)); filedSchemaMap1.put("c1", new FieldSchema("c1", "INT", "100", null)); scrFiledSchema.put(tab1, filedSchemaMap1); @@ -321,7 +321,7 @@ private Map> buildSrcFiledSchema() { Map filedSchemaMap2 = new LinkedHashMap<>(); filedSchemaMap2.put("id", new FieldSchema("id", "INT", "10000", null)); filedSchemaMap2.put("c2", new FieldSchema("c2", "INT", null, null)); - filedSchemaMap2.put("c555", new FieldSchema("c555", "VARCHAR(300)", null, null)); + filedSchemaMap2.put("c555", new FieldSchema("c555", "VARCHAR(400)", null, null)); filedSchemaMap2.put("c666", new FieldSchema("c666", "INT", "100", null)); filedSchemaMap2.put("c4", new FieldSchema("c4", "BIGINT", "555", null)); filedSchemaMap2.put("c199", new FieldSchema("c199", "INT", null, null)); @@ -370,7 +370,7 @@ public void testBuildOracle2DorisTypeName() throws IOException { schemaChange.setSourceConnector("oracle"); JsonNode columns = objectMapper.readTree(columnInfo); String dorisTypeName = schemaChange.buildDorisTypeName(columns); - Assert.assertEquals(dorisTypeName, "VARCHAR(384)"); + Assert.assertEquals(dorisTypeName, "VARCHAR(512)"); columnInfo = "{\"name\":\"NAME\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"FLOAT\",\"typeExpression\":\"FLOAT\",\"charsetName\":null,\"length\":0,\"scale\":null,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null}"; columns = objectMapper.readTree(columnInfo); @@ -385,7 +385,7 @@ public void testBuildPostgres2DorisTypeName() throws IOException { schemaChange.setSourceConnector("postgres"); JsonNode columns = objectMapper.readTree(columnInfo); String dorisTypeName = schemaChange.buildDorisTypeName(columns); - Assert.assertEquals(dorisTypeName, "VARCHAR(384)"); + Assert.assertEquals(dorisTypeName, "VARCHAR(512)"); } @Test @@ -395,7 +395,7 @@ public void testBuildSqlserver2DorisTypeName() throws IOException { schemaChange.setSourceConnector("sqlserver"); JsonNode columns = objectMapper.readTree(columnInfo); String dorisTypeName = schemaChange.buildDorisTypeName(columns); - Assert.assertEquals(dorisTypeName, "VARCHAR(384)"); + Assert.assertEquals(dorisTypeName, "VARCHAR(512)"); } @Test @@ -636,7 +636,7 @@ public void buildFieldSchemaTest() { Assert.assertTrue(result.containsKey("other_no")); fieldSchema = result.get("other_no"); Assert.assertEquals(fieldSchema.getName().toLowerCase(), "other_no"); - Assert.assertEquals(fieldSchema.getTypeString().toLowerCase(), "varchar(150)"); + Assert.assertEquals(fieldSchema.getTypeString().toLowerCase(), "varchar(200)"); Assert.assertEquals(fieldSchema.getDefaultValue().toLowerCase(), ""); Assert.assertEquals(fieldSchema.getComment(), "comment"); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java index a7958b70d..ecac98d76 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java @@ -150,7 +150,7 @@ public void testAutoCreateTable() throws IOException { schemaChange.tryParseCreateTableStatement(recordJsonNode, "doris.auto_tab"); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT', defaultValue='10000', comment='id_test'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(300)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT', defaultValue='10000', comment='id_test'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(400)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -162,7 +162,7 @@ public void testAutoCreateUniqueTable() throws IOException { TableSchema tableSchema = schemaChange.tryParseCreateTableStatement(recordJsonNode, "doris.auto_unique_tab"); String expected = - "TableSchema{database='doris', table='auto_unique_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(300)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_unique_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(400)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(400)', defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -175,7 +175,7 @@ public void testAutoCreateDuplicateTable() throws IOException { schemaChange.tryParseCreateTableStatement( recordJsonNode, "doris.auto_duplicate_tab"); String expected = - "TableSchema{database='doris', table='auto_duplicate_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}}, keys=id, model=DUPLICATE, distributeKeys=id, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_duplicate_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(200)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(1020)', defaultValue='null', comment='null'}}, keys=id, model=DUPLICATE, distributeKeys=id, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java index 22656902c..f00aa7fe0 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java @@ -36,10 +36,10 @@ public void db2FullTypeTest() { assertEquals("DECIMALV3(31,31)", Db2Type.toDorisType("DECIMAL", 31, 31)); assertEquals("DECIMALV3(31,0)", Db2Type.toDorisType("NUMERIC", 31, 0)); assertEquals("DECIMALV3(31,31)", Db2Type.toDorisType("NUMERIC", 31, 31)); - assertEquals("VARCHAR(600)", Db2Type.toDorisType("VARCHAR", 200, null)); + assertEquals("VARCHAR(800)", Db2Type.toDorisType("VARCHAR", 200, null)); assertEquals(DorisType.STRING, Db2Type.toDorisType("VARCHAR", 32672, null)); - assertEquals(DorisType.VARCHAR + "(3)", Db2Type.toDorisType("CHAR", 1, null)); - assertEquals(DorisType.VARCHAR + "(765)", Db2Type.toDorisType("CHAR", 255, null)); + assertEquals(DorisType.VARCHAR + "(4)", Db2Type.toDorisType("CHAR", 1, null)); + assertEquals(DorisType.VARCHAR + "(1020)", Db2Type.toDorisType("CHAR", 255, null)); assertEquals(DorisType.DATETIME_V2 + "(0)", Db2Type.toDorisType("TIMESTAMP", 26, 0)); assertEquals(DorisType.DATETIME_V2 + "(6)", Db2Type.toDorisType("TIMESTAMP", 26, 6)); assertEquals(DorisType.DATETIME_V2 + "(6)", Db2Type.toDorisType("TIMESTAMP", 26, 9));