diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java index bfca184ec..899a84e05 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java @@ -155,7 +155,7 @@ private static class LogicalTypeVisitor extends LogicalTypeDefaultVisitor= MAX_VARCHAR_SIZE ? STRING : String.format("%s(%s)", VARCHAR, length); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java index 09f0f3a69..2bc8a981c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java @@ -148,12 +148,12 @@ private String handleType(String type) { if (type == null || "".equals(type)) { return ""; } - // varchar len * 3 + // varchar len * 4 Pattern pattern = Pattern.compile("varchar\\(([1-9][0-9]*)\\)", Pattern.CASE_INSENSITIVE); Matcher matcher = pattern.matcher(type); if (matcher.find()) { String len = matcher.group(1); - return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 3, 65533)); + return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 4, 65533)); } return type; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java index 1255d1e7c..fed1988e9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java @@ -76,9 +76,9 @@ public static String toDorisType(String db2Type, Integer precision, Integer scal case VARCHAR: case LONG_VARCHAR: Preconditions.checkNotNull(precision); - return precision * 3 > 65533 + return precision * 4 > 65533 ? DorisType.STRING - : String.format("%s(%s)", DorisType.VARCHAR, precision * 3); + : String.format("%s(%s)", DorisType.VARCHAR, precision * 4); case TIMESTAMP: return String.format( "%s(%s)", DorisType.DATETIME_V2, Math.min(scale == null ? 0 : scale, 6)); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java index 49afe3751..ebd3009f0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java @@ -192,9 +192,9 @@ public static String toDorisType(String type, Integer length, Integer scale) { case CHAR: case VARCHAR: Preconditions.checkNotNull(length); - return length * 3 > 65533 + return length * 4 > 65533 ? DorisType.STRING - : String.format("%s(%s)", DorisType.VARCHAR, length * 3); + : String.format("%s(%s)", DorisType.VARCHAR, length * 4); case TINYTEXT: case TEXT: case MEDIUMTEXT: diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java index 304f21fd0..be687ab93 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleType.java @@ -91,9 +91,9 @@ public static String toDorisType(String oracleType, Integer precision, Integer s case NCHAR: case NVARCHAR2: Preconditions.checkNotNull(precision); - return precision * 3 > 65533 + return precision * 4 > 65533 ? DorisType.STRING - : String.format("%s(%s)", DorisType.VARCHAR, precision * 3); + : String.format("%s(%s)", DorisType.VARCHAR, precision * 4); case LONG: case RAW: case LONG_RAW: diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java index ddffb6d9f..9187b5229 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java @@ -107,9 +107,9 @@ public static String toDorisType(String postgresType, Integer precision, Integer case BPCHAR: case VARCHAR: Preconditions.checkNotNull(precision); - return precision * 3 > 65533 + return precision * 4 > 65533 ? DorisType.STRING - : String.format("%s(%s)", DorisType.VARCHAR, precision * 3); + : String.format("%s(%s)", DorisType.VARCHAR, precision * 4); case POINT: case LINE: case LSEG: diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java index ff37c06d5..c96eb5c4b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java @@ -94,9 +94,9 @@ public static String toDorisType(String originSqlServerType, Integer precision, case VARCHAR: case NCHAR: case NVARCHAR: - return precision * 3 > 65533 + return precision * 4 > 65533 ? DorisType.STRING - : String.format("%s(%s)", DorisType.VARCHAR, precision * 3); + : String.format("%s(%s)", DorisType.VARCHAR, precision * 4); case TEXT: case NTEXT: case TIME: diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java index 84cd367c8..5217e9b86 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisTypeMapperTest.java @@ -109,9 +109,9 @@ public void testCharType() { DataType result = DorisTypeMapper.toFlinkType("col", "CHAR", 10, 0); assertEquals(DataTypes.CHAR(10), result); String dorisType = DorisTypeMapper.toDorisType(DataTypes.CHAR(10)); - assertEquals("CHAR(30)", dorisType); + assertEquals("CHAR(40)", dorisType); dorisType = DorisTypeMapper.toDorisType(DataTypes.CHAR(100)); - assertEquals("VARCHAR(300)", dorisType); + assertEquals("VARCHAR(400)", dorisType); } @Test @@ -119,7 +119,7 @@ public void testVarcharType() { DataType result = DorisTypeMapper.toFlinkType("col", "VARCHAR", 50, 0); assertEquals(DataTypes.VARCHAR(50), result); String dorisType = DorisTypeMapper.toDorisType(DataTypes.VARCHAR(50)); - assertEquals("VARCHAR(150)", dorisType); + assertEquals("VARCHAR(200)", dorisType); } @Test diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java index c7b23c18e..77c8cb9df 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java @@ -68,8 +68,8 @@ public void testParserAlterDDLs() { @Test public void testParserAlterDDLsAdd() { List expectDDLs = new ArrayList<>(); - expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `phone_number` VARCHAR(60)"); - expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `address` VARCHAR(765)"); + expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `phone_number` VARCHAR(80)"); + expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `address` VARCHAR(1020)"); SourceConnector mysql = SourceConnector.ORACLE; String ddl = @@ -265,7 +265,7 @@ public void testParseCreateTableStatement() { SourceConnector.MYSQL, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', typeString='INT', defaultValue='10000', comment='id_test'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(300)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', typeString='INT', defaultValue='10000', comment='id_test'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(400)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -289,7 +289,7 @@ public void testParseCreateTableUnsignedStatement() { SourceConnector.MYSQL, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', typeString='BIGINT', defaultValue='10000', comment='id_test'}, `id2`=FieldSchema{name='`id2`', typeString='LARGEINT', defaultValue='10000', comment='id2_comment'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(300)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', typeString='BIGINT', defaultValue='10000', comment='id_test'}, `id2`=FieldSchema{name='`id2`', typeString='LARGEINT', defaultValue='10000', comment='id2_comment'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(400)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}"; System.out.println(tableSchema.toString()); Assert.assertEquals(expected, tableSchema.toString()); } @@ -304,7 +304,7 @@ public void testParseCreateUniqueTableStatement() { SourceConnector.MYSQL, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_uni_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(300)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_uni_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(400)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(400)', defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -323,7 +323,7 @@ public void testParseCreateDuplicateTableStatement() { SourceConnector.MYSQL, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_duptab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}}, keys=id, model=DUPLICATE, distributeKeys=id, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_duptab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(200)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(1020)', defaultValue='null', comment='null'}}, keys=id, model=DUPLICATE, distributeKeys=id, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -352,7 +352,7 @@ public void testParseOracleTableStatement() { SourceConnector.ORACLE, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', comment='null'}, phone_number=FieldSchema{name='phone_number', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, hire_date=FieldSchema{name='hire_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, job_id=FieldSchema{name='job_id', typeString='VARCHAR(30)', defaultValue='null', comment='null'}, salary=FieldSchema{name='salary', typeString='DECIMALV3(8,2)', defaultValue='null', comment='null'}, commission_pct=FieldSchema{name='commission_pct', typeString='DECIMALV3(2,2)', defaultValue='null', comment='null'}, manager_id=FieldSchema{name='manager_id', typeString='BIGINT', defaultValue='null', comment='null'}, department_id=FieldSchema{name='department_id', typeString='BIGINT', defaultValue='null', comment='null'}}, keys=employee_id, model=UNIQUE, distributeKeys=employee_id, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(200)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(200)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(400)', defaultValue='null', comment='null'}, phone_number=FieldSchema{name='phone_number', typeString='VARCHAR(80)', defaultValue='null', comment='null'}, hire_date=FieldSchema{name='hire_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, job_id=FieldSchema{name='job_id', typeString='VARCHAR(40)', defaultValue='null', comment='null'}, salary=FieldSchema{name='salary', typeString='DECIMALV3(8,2)', defaultValue='null', comment='null'}, commission_pct=FieldSchema{name='commission_pct', typeString='DECIMALV3(2,2)', defaultValue='null', comment='null'}, manager_id=FieldSchema{name='manager_id', typeString='BIGINT', defaultValue='null', comment='null'}, department_id=FieldSchema{name='department_id', typeString='BIGINT', defaultValue='null', comment='null'}}, keys=employee_id, model=UNIQUE, distributeKeys=employee_id, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -378,7 +378,7 @@ public void testParseOraclePrimaryTableStatement() { SourceConnector.ORACLE, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', comment='null'}, phone_number=FieldSchema{name='phone_number', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, hire_date=FieldSchema{name='hire_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, job_id=FieldSchema{name='job_id', typeString='VARCHAR(30)', defaultValue='null', comment='null'}, salary=FieldSchema{name='salary', typeString='DECIMALV3(8,2)', defaultValue='null', comment='null'}, commission_pct=FieldSchema{name='commission_pct', typeString='DECIMALV3(2,2)', defaultValue='null', comment='null'}, manager_id=FieldSchema{name='manager_id', typeString='BIGINT', defaultValue='null', comment='null'}, department_id=FieldSchema{name='department_id', typeString='BIGINT', defaultValue='null', comment='null'}}, keys=employee_id, model=UNIQUE, distributeKeys=employee_id, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(200)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(200)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(400)', defaultValue='null', comment='null'}, phone_number=FieldSchema{name='phone_number', typeString='VARCHAR(80)', defaultValue='null', comment='null'}, hire_date=FieldSchema{name='hire_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, job_id=FieldSchema{name='job_id', typeString='VARCHAR(40)', defaultValue='null', comment='null'}, salary=FieldSchema{name='salary', typeString='DECIMALV3(8,2)', defaultValue='null', comment='null'}, commission_pct=FieldSchema{name='commission_pct', typeString='DECIMALV3(2,2)', defaultValue='null', comment='null'}, manager_id=FieldSchema{name='manager_id', typeString='BIGINT', defaultValue='null', comment='null'}, department_id=FieldSchema{name='department_id', typeString='BIGINT', defaultValue='null', comment='null'}}, keys=employee_id, model=UNIQUE, distributeKeys=employee_id, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -405,7 +405,7 @@ public void testParseOracleDuplicateTableStatement() { dorisTable, new DorisTableConfig(new HashMap<>())); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={order_id=FieldSchema{name='order_id', typeString='BIGINT', defaultValue='null', comment='null'}, customer_id=FieldSchema{name='customer_id', typeString='BIGINT', defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, status=FieldSchema{name='status', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, total_amount=FieldSchema{name='total_amount', typeString='DECIMALV3(12,2)', defaultValue='null', comment='null'}, shipping_address=FieldSchema{name='shipping_address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}, delivery_date=FieldSchema{name='delivery_date', typeString='DATETIMEV2', defaultValue='null', comment='null'}}, keys=order_id, model=DUPLICATE, distributeKeys=order_id, properties={light_schema_change=true}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={order_id=FieldSchema{name='order_id', typeString='BIGINT', defaultValue='null', comment='null'}, customer_id=FieldSchema{name='customer_id', typeString='BIGINT', defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, status=FieldSchema{name='status', typeString='VARCHAR(80)', defaultValue='null', comment='null'}, total_amount=FieldSchema{name='total_amount', typeString='DECIMALV3(12,2)', defaultValue='null', comment='null'}, shipping_address=FieldSchema{name='shipping_address', typeString='VARCHAR(1020)', defaultValue='null', comment='null'}, delivery_date=FieldSchema{name='delivery_date', typeString='DATETIMEV2', defaultValue='null', comment='null'}}, keys=order_id, model=DUPLICATE, distributeKeys=order_id, properties={light_schema_change=true}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java index e66ecaab8..a7c053b6b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java @@ -70,7 +70,7 @@ public void setUp() { @Test public void testExtractDDL() throws IOException, IllegalArgumentException { - String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(600)"; + String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(800)"; String record = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}"; JsonNode recordRoot = objectMapper.readTree(record); @@ -93,8 +93,8 @@ public void testSchemaChangeMultiTable() throws Exception { "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}"; String ddl2 = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t2\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}"; - String exceptDDL1 = "ALTER TABLE doris.t1 add COLUMN c_1 varchar(600)"; - String exceptDDL2 = "ALTER TABLE doris.t2 add COLUMN c_1 varchar(600)"; + String exceptDDL1 = "ALTER TABLE doris.t1 add COLUMN c_1 varchar(800)"; + String exceptDDL2 = "ALTER TABLE doris.t2 add COLUMN c_1 varchar(800)"; Assert.assertEquals(exceptDDL1, schemaChange.extractDDL(objectMapper.readTree(ddl1))); Assert.assertEquals(exceptDDL2, schemaChange.extractDDL(objectMapper.readTree(ddl2))); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java index fc3f6ffbd..4ae4e280e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java @@ -199,7 +199,7 @@ public void testExtractDDLListRenameColumn() throws IOException { public void testFillOriginSchema() throws IOException { Map srcFiledSchemaMap = new LinkedHashMap<>(); srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null)); - srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)", null, null)); + srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(200)", null, null)); srcFiledSchemaMap.put( "test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null)); srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "100", null)); @@ -313,7 +313,7 @@ private Map> buildSrcFiledSchema() { Map> scrFiledSchema = new LinkedHashMap<>(); Map filedSchemaMap1 = new LinkedHashMap<>(); filedSchemaMap1.put("id", new FieldSchema("id", "INT", null, null)); - filedSchemaMap1.put("name", new FieldSchema("name", "VARCHAR(150)", null, null)); + filedSchemaMap1.put("name", new FieldSchema("name", "VARCHAR(200)", null, null)); filedSchemaMap1.put("test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null)); filedSchemaMap1.put("c1", new FieldSchema("c1", "INT", "100", null)); scrFiledSchema.put(tab1, filedSchemaMap1); @@ -321,7 +321,7 @@ private Map> buildSrcFiledSchema() { Map filedSchemaMap2 = new LinkedHashMap<>(); filedSchemaMap2.put("id", new FieldSchema("id", "INT", "10000", null)); filedSchemaMap2.put("c2", new FieldSchema("c2", "INT", null, null)); - filedSchemaMap2.put("c555", new FieldSchema("c555", "VARCHAR(300)", null, null)); + filedSchemaMap2.put("c555", new FieldSchema("c555", "VARCHAR(400)", null, null)); filedSchemaMap2.put("c666", new FieldSchema("c666", "INT", "100", null)); filedSchemaMap2.put("c4", new FieldSchema("c4", "BIGINT", "555", null)); filedSchemaMap2.put("c199", new FieldSchema("c199", "INT", null, null)); @@ -370,7 +370,7 @@ public void testBuildOracle2DorisTypeName() throws IOException { schemaChange.setSourceConnector("oracle"); JsonNode columns = objectMapper.readTree(columnInfo); String dorisTypeName = schemaChange.buildDorisTypeName(columns); - Assert.assertEquals(dorisTypeName, "VARCHAR(384)"); + Assert.assertEquals(dorisTypeName, "VARCHAR(512)"); columnInfo = "{\"name\":\"NAME\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"FLOAT\",\"typeExpression\":\"FLOAT\",\"charsetName\":null,\"length\":0,\"scale\":null,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null}"; columns = objectMapper.readTree(columnInfo); @@ -385,7 +385,7 @@ public void testBuildPostgres2DorisTypeName() throws IOException { schemaChange.setSourceConnector("postgres"); JsonNode columns = objectMapper.readTree(columnInfo); String dorisTypeName = schemaChange.buildDorisTypeName(columns); - Assert.assertEquals(dorisTypeName, "VARCHAR(384)"); + Assert.assertEquals(dorisTypeName, "VARCHAR(512)"); } @Test @@ -395,7 +395,7 @@ public void testBuildSqlserver2DorisTypeName() throws IOException { schemaChange.setSourceConnector("sqlserver"); JsonNode columns = objectMapper.readTree(columnInfo); String dorisTypeName = schemaChange.buildDorisTypeName(columns); - Assert.assertEquals(dorisTypeName, "VARCHAR(384)"); + Assert.assertEquals(dorisTypeName, "VARCHAR(512)"); } @Test @@ -636,7 +636,7 @@ public void buildFieldSchemaTest() { Assert.assertTrue(result.containsKey("other_no")); fieldSchema = result.get("other_no"); Assert.assertEquals(fieldSchema.getName().toLowerCase(), "other_no"); - Assert.assertEquals(fieldSchema.getTypeString().toLowerCase(), "varchar(150)"); + Assert.assertEquals(fieldSchema.getTypeString().toLowerCase(), "varchar(200)"); Assert.assertEquals(fieldSchema.getDefaultValue().toLowerCase(), ""); Assert.assertEquals(fieldSchema.getComment(), "comment"); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java index a7958b70d..ecac98d76 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java @@ -150,7 +150,7 @@ public void testAutoCreateTable() throws IOException { schemaChange.tryParseCreateTableStatement(recordJsonNode, "doris.auto_tab"); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT', defaultValue='10000', comment='id_test'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(300)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT', defaultValue='10000', comment='id_test'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(400)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -162,7 +162,7 @@ public void testAutoCreateUniqueTable() throws IOException { TableSchema tableSchema = schemaChange.tryParseCreateTableStatement(recordJsonNode, "doris.auto_unique_tab"); String expected = - "TableSchema{database='doris', table='auto_unique_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(300)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_unique_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(400)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(400)', defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -175,7 +175,7 @@ public void testAutoCreateDuplicateTable() throws IOException { schemaChange.tryParseCreateTableStatement( recordJsonNode, "doris.auto_duplicate_tab"); String expected = - "TableSchema{database='doris', table='auto_duplicate_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}}, keys=id, model=DUPLICATE, distributeKeys=id, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_duplicate_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(200)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(1020)', defaultValue='null', comment='null'}}, keys=id, model=DUPLICATE, distributeKeys=id, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java index 22656902c..f00aa7fe0 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java @@ -36,10 +36,10 @@ public void db2FullTypeTest() { assertEquals("DECIMALV3(31,31)", Db2Type.toDorisType("DECIMAL", 31, 31)); assertEquals("DECIMALV3(31,0)", Db2Type.toDorisType("NUMERIC", 31, 0)); assertEquals("DECIMALV3(31,31)", Db2Type.toDorisType("NUMERIC", 31, 31)); - assertEquals("VARCHAR(600)", Db2Type.toDorisType("VARCHAR", 200, null)); + assertEquals("VARCHAR(800)", Db2Type.toDorisType("VARCHAR", 200, null)); assertEquals(DorisType.STRING, Db2Type.toDorisType("VARCHAR", 32672, null)); - assertEquals(DorisType.VARCHAR + "(3)", Db2Type.toDorisType("CHAR", 1, null)); - assertEquals(DorisType.VARCHAR + "(765)", Db2Type.toDorisType("CHAR", 255, null)); + assertEquals(DorisType.VARCHAR + "(4)", Db2Type.toDorisType("CHAR", 1, null)); + assertEquals(DorisType.VARCHAR + "(1020)", Db2Type.toDorisType("CHAR", 255, null)); assertEquals(DorisType.DATETIME_V2 + "(0)", Db2Type.toDorisType("TIMESTAMP", 26, 0)); assertEquals(DorisType.DATETIME_V2 + "(6)", Db2Type.toDorisType("TIMESTAMP", 26, 6)); assertEquals(DorisType.DATETIME_V2 + "(6)", Db2Type.toDorisType("TIMESTAMP", 26, 9));