diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
index 5e69fde7e95..39e4b5d488c 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
@@ -215,6 +215,13 @@ pipeline:
Duration |
StarRocks 侧执行 schema change 的超时时间,必须是秒的整数倍。超时后 StarRocks 将会取消 schema change,从而导致作业失败。 |
+
+ | unicode-char.max-bytes |
+ optional |
+ 3 |
+ Integer |
+ 将上游 CHAR 和 VARCHAR 类型映射到 StarRocks 时,为每个字符分配的最大字节数。由于 StarRocks 的长度以字节为单位,如果上游使用 utf8mb4,建议将该选项设置为 4,以避免低估目标列长度。默认值仍为 3,以保持向后兼容。 |
+
| sink.socket.timeout-ms |
optional |
@@ -320,22 +327,19 @@ pipeline:
|
- | CHAR(n) where n <= 85 |
- CHAR(n * 3) |
- CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks
- 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以只有当 CDC 中长度不超过85时,才将 CDC CHAR 映射到 StarRocks CHAR。 |
+ CHAR(n),且 n * unicode-char.max-bytes <= 255,并且不是主键列 |
+ CHAR(n * unicode-char.max-bytes) |
+ CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。StarRocks 的长度按 n * unicode-char.max-bytes 计算。由于 StarRocks CHAR 类型的最大长度为 255,只有当计算后的长度不超过 255 时,才将 CDC CHAR 映射到 StarRocks CHAR。如果该列是主键列,则会映射为 VARCHAR。 |
- | CHAR(n) where n > 85 |
- VARCHAR(n * 3) |
- CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks
- 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以当 CDC 中长度超过85时,才将 CDC CHAR 映射到 StarRocks VARCHAR。 |
+ CHAR(n),且 n * unicode-char.max-bytes > 255,或主键列 |
+ VARCHAR(min(n * unicode-char.max-bytes, 1048576)) |
+ CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。StarRocks 的长度按 n * unicode-char.max-bytes 计算。由于 StarRocks CHAR 类型的最大长度为 255,当计算后的长度超过 255 时,会将 CDC CHAR 映射到 StarRocks VARCHAR。主键 CHAR 列也会映射为 VARCHAR。 |
| VARCHAR(n) |
- VARCHAR(n * 3) |
- CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks
- 中为 n * 3。 |
+ VARCHAR(min(n * unicode-char.max-bytes, 1048576)) |
+ CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。StarRocks 的长度按 n * unicode-char.max-bytes 计算,并且最大不会超过 1048576。 |
| BINARY(n) |
diff --git a/docs/content/docs/connectors/pipeline-connectors/starrocks.md b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
index 13297d3afe8..b36303be33a 100644
--- a/docs/content/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
@@ -222,6 +222,13 @@ pipeline:
seconds. StarRocks will cancel the schema change after timeout which will
cause the sink failure.
+
+ | unicode-char.max-bytes |
+ optional |
+ 3 |
+ Integer |
+ The maximum number of bytes allocated for each upstream character when mapping CHAR and VARCHAR types to StarRocks, whose length is measured in bytes. If the upstream source uses utf8mb4, set this option to 4 to avoid underestimating column lengths. The default value of 3 is retained for backward compatibility. |
+
| sink.socket.timeout-ms |
optional |
@@ -329,24 +336,19 @@ pipeline:
|
- | CHAR(n) where n <= 85 |
- CHAR(n * 3) |
- CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese
- character is equal to three bytes, so the length for StarRocks is n * 3. Because the max length of StarRocks
- CHAR is 255, map CDC CHAR to StarRocks CHAR only when the CDC length is no larger than 85. |
+ CHAR(n) where n * unicode-char.max-bytes <= 255 and not primary key |
+ CHAR(n * unicode-char.max-bytes) |
+ CDC defines the length by characters, and StarRocks defines it by bytes. The StarRocks length is calculated as n * unicode-char.max-bytes. Because the max length of StarRocks CHAR is 255, map CDC CHAR to StarRocks CHAR only when the calculated length is no larger than 255. If the column is part of the primary key, it is mapped to VARCHAR instead. |
- | CHAR(n) where n > 85 |
- VARCHAR(n * 3) |
- CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese
- character is equal to three bytes, so the length for StarRocks is n * 3. Because the max length of StarRocks
- CHAR is 255, map CDC CHAR to StarRocks VARCHAR if the CDC length is larger than 85. |
+ CHAR(n) where n * unicode-char.max-bytes > 255, or primary key |
+ VARCHAR(min(n * unicode-char.max-bytes, 1048576)) |
+ CDC defines the length by characters, and StarRocks defines it by bytes. The StarRocks length is calculated as n * unicode-char.max-bytes. Because the max length of StarRocks CHAR is 255, map CDC CHAR to StarRocks VARCHAR when the calculated length exceeds 255. Primary key CHAR columns are also mapped to VARCHAR. |
| VARCHAR(n) |
- VARCHAR(n * 3) |
- CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese
- character is equal to three bytes, so the length for StarRocks is n * 3. |
+ VARCHAR(min(n * unicode-char.max-bytes, 1048576)) |
+ CDC defines the length by characters, and StarRocks defines it by bytes. The StarRocks length is calculated as n * unicode-char.max-bytes and capped at 1048576. |
| BINARY(n) |
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
index fbb34cd518e..373594b00a3 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
@@ -182,6 +182,7 @@ public Set> optionalOptions() {
optionalOptions.add(StarRocksDataSinkOptions.SINK_METRIC_HISTOGRAM_WINDOW_SIZE);
optionalOptions.add(StarRocksDataSinkOptions.TABLE_CREATE_NUM_BUCKETS);
optionalOptions.add(StarRocksDataSinkOptions.TABLE_SCHEMA_CHANGE_TIMEOUT);
+ optionalOptions.add(StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES);
return optionalOptions;
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java
index 5f7b7d8ab9a..6afd470aa82 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java
@@ -157,4 +157,14 @@ public class StarRocksDataSinkOptions {
"Timeout for a schema change on StarRocks side, and must be an integral multiple of "
+ "seconds. StarRocks will cancel the schema change after timeout which will "
+ "cause the sink failure.");
+
+ public static final ConfigOption UNICODE_CHAR_MAX_BYTES =
+ ConfigOptions.key("unicode-char.max-bytes")
+ .intType()
+ .defaultValue(3)
+ .withDescription(
+ "Specifies how many bytes are allocated for each upstream character when mapping "
+ + "CHAR and VARCHAR types to StarRocks, whose length is measured in bytes. "
+ + "Valid values are within [1, 4]. If the upstream source uses utf8mb4, "
+ + "set this option to 4 to avoid underestimating column lengths.");
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
index 00dcb67b633..7f020844106 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
@@ -162,7 +162,7 @@ private void applyAddColumn(AddColumnEvent addColumnEvent) throws SchemaEvolveEx
.setDefaultValue(
StarRocksUtils.convertInvalidTimestampDefaultValue(
column.getDefaultValueExpression(), column.getType()));
- toStarRocksDataType(column, false, builder);
+ toStarRocksDataType(column, false, builder, tableCreateConfig.getUnicodeCharMaxBytes());
addColumns.add(builder.build());
}
@@ -321,7 +321,11 @@ private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolv
for (Map.Entry entry : typeMapping.entrySet()) {
StarRocksColumn.Builder builder =
new StarRocksColumn.Builder().setColumnName(entry.getKey());
- toStarRocksDataType(entry.getValue(), false, builder);
+ toStarRocksDataType(
+ entry.getValue(),
+ false,
+ builder,
+ tableCreateConfig.getUnicodeCharMaxBytes());
catalog.alterColumnType(
tableId.getSchemaName(), tableId.getTableName(), builder.build());
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
index f72f5ddc508..d2a50d4abf8 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
@@ -95,7 +95,11 @@ public static StarRocksTable toStarRocksTable(
.setDefaultValue(
convertInvalidTimestampDefaultValue(
column.getDefaultValueExpression(), column.getType()));
- toStarRocksDataType(column, i < primaryKeyCount, builder);
+ toStarRocksDataType(
+ column,
+ i < primaryKeyCount,
+ builder,
+ tableCreateConfig.getUnicodeCharMaxBytes());
starRocksColumns.add(builder.build());
}
@@ -119,14 +123,38 @@ public static StarRocksTable toStarRocksTable(
/** Convert CDC data type to StarRocks data type. */
public static void toStarRocksDataType(
DataType cdcDataType, boolean isPrimaryKeys, StarRocksColumn.Builder builder) {
+ toStarRocksDataType(
+ cdcDataType,
+ isPrimaryKeys,
+ builder,
+ StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
+ }
+
+ public static void toStarRocksDataType(
+ DataType cdcDataType,
+ boolean isPrimaryKeys,
+ StarRocksColumn.Builder builder,
+ int unicodeCharMaxBytes) {
CdcDataTypeTransformer dataTypeTransformer =
- new CdcDataTypeTransformer(isPrimaryKeys, builder);
+ new CdcDataTypeTransformer(isPrimaryKeys, builder, unicodeCharMaxBytes);
cdcDataType.accept(dataTypeTransformer);
}
public static void toStarRocksDataType(
Column cdcColumn, boolean isPrimaryKeys, StarRocksColumn.Builder builder) {
- toStarRocksDataType(cdcColumn.getType(), isPrimaryKeys, builder);
+ toStarRocksDataType(
+ cdcColumn,
+ isPrimaryKeys,
+ builder,
+ StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
+ }
+
+ public static void toStarRocksDataType(
+ Column cdcColumn,
+ boolean isPrimaryKeys,
+ StarRocksColumn.Builder builder,
+ int unicodeCharMaxBytes) {
+ toStarRocksDataType(cdcColumn.getType(), isPrimaryKeys, builder, unicodeCharMaxBytes);
}
/** Format DATE type data. */
@@ -297,10 +325,20 @@ public static class CdcDataTypeTransformer
private final StarRocksColumn.Builder builder;
private final boolean isPrimaryKeys;
+ private final int unicodeCharMaxBytes;
public CdcDataTypeTransformer(boolean isPrimaryKeys, StarRocksColumn.Builder builder) {
+ this(
+ isPrimaryKeys,
+ builder,
+ StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
+ }
+
+ public CdcDataTypeTransformer(
+ boolean isPrimaryKeys, StarRocksColumn.Builder builder, int unicodeCharMaxBytes) {
this.isPrimaryKeys = isPrimaryKeys;
this.builder = builder;
+ this.unicodeCharMaxBytes = unicodeCharMaxBytes;
}
@Override
@@ -379,13 +417,13 @@ public StarRocksColumn.Builder visit(DecimalType decimalType) {
@Override
public StarRocksColumn.Builder visit(CharType charType) {
// CDC and StarRocks use different units for the length. It's the number
- // of characters in CDC, and the number of bytes in StarRocks. One chinese
- // character will use 3 bytes because it uses UTF-8, so the length of StarRocks
- // char type should be three times as that of CDC char type. Specifically, if
- // the length of StarRocks exceeds the MAX_CHAR_SIZE, map CDC char type to StarRocks
- // varchar type
+ // of characters in CDC, and the number of bytes in StarRocks. The number
+ // of bytes needed for each character depends on the upstream encoding, so
+ // the length of StarRocks char type should be scaled by unicodeCharMaxBytes.
+ // Specifically, if the length of StarRocks exceeds the MAX_CHAR_SIZE, map
+ // CDC char type to StarRocks varchar type.
int length = charType.getLength();
- long starRocksLength = length * 3L;
+ long starRocksLength = (long) length * unicodeCharMaxBytes;
// In the StarRocks, The primary key columns can be any of the following data types:
// BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT, STRING, VARCHAR, DATE, and
// DATETIME, But it doesn't include CHAR. When a char type appears in the primary key of
@@ -405,11 +443,11 @@ public StarRocksColumn.Builder visit(CharType charType) {
@Override
public StarRocksColumn.Builder visit(VarCharType varCharType) {
// CDC and StarRocks use different units for the length. It's the number
- // of characters in CDC, and the number of bytes in StarRocks. One chinese
- // character will use 3 bytes because it uses UTF-8, so the length of StarRocks
- // varchar type should be three times as that of CDC varchar type.
+ // of characters in CDC, and the number of bytes in StarRocks. The number
+ // of bytes needed for each character depends on the upstream encoding, so
+ // the length of StarRocks varchar type should be scaled by unicodeCharMaxBytes.
int length = varCharType.getLength();
- long starRocksLength = length * 3L;
+ long starRocksLength = (long) length * unicodeCharMaxBytes;
builder.setDataType(VARCHAR);
builder.setNullable(varCharType.isNullable());
builder.setColumnSize((int) Math.min(starRocksLength, MAX_VARCHAR_SIZE));
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/TableCreateConfig.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/TableCreateConfig.java
index b3bae0f7b84..7eaeb477f21 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/TableCreateConfig.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/TableCreateConfig.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.starrocks.sink;
import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.utils.Preconditions;
import javax.annotation.Nullable;
@@ -44,9 +45,29 @@ public class TableCreateConfig implements Serializable {
/** Properties for the table. */
private final Map properties;
+ /** Maximum number of bytes a single character can take in UTF-8 encoding. */
+ public static final int MAX_UNICODE_CHAR_BYTES = 4;
+
+ /** Max bytes allocated for each upstream Unicode character during schema mapping. */
+ private final int unicodeCharMaxBytes;
+
public TableCreateConfig(@Nullable Integer numBuckets, Map properties) {
+ this(
+ numBuckets,
+ properties,
+ StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
+ }
+
+ public TableCreateConfig(
+ @Nullable Integer numBuckets, Map properties, int unicodeCharMaxBytes) {
+ Preconditions.checkArgument(
+ unicodeCharMaxBytes >= 1 && unicodeCharMaxBytes <= MAX_UNICODE_CHAR_BYTES,
+ "unicode-char.max-bytes must be between 1 and %s, but actually is %s",
+ MAX_UNICODE_CHAR_BYTES,
+ unicodeCharMaxBytes);
this.numBuckets = numBuckets;
this.properties = new HashMap<>(properties);
+ this.unicodeCharMaxBytes = unicodeCharMaxBytes;
}
public Optional getNumBuckets() {
@@ -57,6 +78,10 @@ public Map getProperties() {
return Collections.unmodifiableMap(properties);
}
+ public int getUnicodeCharMaxBytes() {
+ return unicodeCharMaxBytes;
+ }
+
public static TableCreateConfig from(Configuration config) {
Integer numBuckets = config.get(StarRocksDataSinkOptions.TABLE_CREATE_NUM_BUCKETS);
Map tableProperties =
@@ -77,6 +102,9 @@ public static TableCreateConfig from(Configuration config) {
.length())
.toLowerCase(),
Map.Entry::getValue));
- return new TableCreateConfig(numBuckets, tableProperties);
+ return new TableCreateConfig(
+ numBuckets,
+ tableProperties,
+ config.get(StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES));
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
index 8fe51a1c466..cdc34529ac0 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
@@ -150,6 +150,58 @@ void testVarCharType() {
Assertions.assertThat(largeLengthColumn.isNullable()).isTrue();
}
+ @Test
+ void testCharTypeWithUnicodeCharMaxBytes() {
+ // With unicode-char.max-bytes = 4, a CHAR(17) should be sized 4 * 17 = 68 (<=
+ // MAX_CHAR_SIZE)
+ StarRocksColumn.Builder charBuilder =
+ new StarRocksColumn.Builder().setColumnName("utf8mb4_char").setOrdinalPosition(0);
+ new CharType(17).accept(new StarRocksUtils.CdcDataTypeTransformer(false, charBuilder, 4));
+ StarRocksColumn charColumn = charBuilder.build();
+ Assertions.assertThat(charColumn.getDataType()).isEqualTo(StarRocksUtils.CHAR);
+ Assertions.assertThat(charColumn.getColumnSize()).hasValue(68);
+
+ // 4 * 100 = 400 > MAX_CHAR_SIZE, so it should fall back to VARCHAR
+ StarRocksColumn.Builder overflowBuilder =
+ new StarRocksColumn.Builder().setColumnName("overflow_char").setOrdinalPosition(1);
+ new CharType(100)
+ .accept(new StarRocksUtils.CdcDataTypeTransformer(false, overflowBuilder, 4));
+ StarRocksColumn overflowColumn = overflowBuilder.build();
+ Assertions.assertThat(overflowColumn.getDataType()).isEqualTo(StarRocksUtils.VARCHAR);
+ Assertions.assertThat(overflowColumn.getColumnSize()).hasValue(400);
+
+ // Primary key CHAR is always mapped to VARCHAR, scaled by unicode-char.max-bytes
+ StarRocksColumn.Builder pkBuilder =
+ new StarRocksColumn.Builder().setColumnName("pk_char").setOrdinalPosition(2);
+ new CharType(17).accept(new StarRocksUtils.CdcDataTypeTransformer(true, pkBuilder, 4));
+ StarRocksColumn pkColumn = pkBuilder.build();
+ Assertions.assertThat(pkColumn.getDataType()).isEqualTo(StarRocksUtils.VARCHAR);
+ Assertions.assertThat(pkColumn.getColumnSize()).hasValue(68);
+ }
+
+ @Test
+ void testVarCharTypeWithUnicodeCharMaxBytes() {
+ // With unicode-char.max-bytes = 4, a VARCHAR(17) should be sized 4 * 17 = 68
+ StarRocksColumn.Builder builder =
+ new StarRocksColumn.Builder()
+ .setColumnName("utf8mb4_varchar")
+ .setOrdinalPosition(0);
+ new VarCharType(17).accept(new StarRocksUtils.CdcDataTypeTransformer(false, builder, 4));
+ StarRocksColumn column = builder.build();
+ Assertions.assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARCHAR);
+ Assertions.assertThat(column.getColumnSize()).hasValue(68);
+
+ // The result should still be capped at MAX_VARCHAR_SIZE
+ StarRocksColumn.Builder largeBuilder =
+ new StarRocksColumn.Builder().setColumnName("large_varchar").setOrdinalPosition(1);
+ new VarCharType(StarRocksUtils.MAX_VARCHAR_SIZE)
+ .accept(new StarRocksUtils.CdcDataTypeTransformer(false, largeBuilder, 4));
+ StarRocksColumn largeColumn = largeBuilder.build();
+ Assertions.assertThat(largeColumn.getDataType()).isEqualTo(StarRocksUtils.VARCHAR);
+ Assertions.assertThat(largeColumn.getColumnSize())
+ .hasValue(StarRocksUtils.MAX_VARCHAR_SIZE);
+ }
+
@Test
void testBinaryType() {
StarRocksColumn.Builder builder =
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
index fd1d3c06845..d9164d60cbd 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -270,6 +270,35 @@ void testStarRocksDataType() throws Exception {
assertEqualsInOrder(expected, actual);
}
+ @Test
+ void testStarRocksDataTypeWithUnicodeCharMaxBytes() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ StarRocksContainer.STARROCKS_DATABASE_NAME,
+ StarRocksContainer.STARROCKS_TABLE_NAME);
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), "ID"))
+ .column(new PhysicalColumn("char", DataTypes.CHAR(17), "Char"))
+ .column(new PhysicalColumn("varchar", DataTypes.VARCHAR(17), "Var Char"))
+ .primaryKey("id")
+ .build();
+
+ runJobWithEvents(
+ Collections.singletonList(new CreateTableEvent(tableId, schema)),
+ new Configuration().set(StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES, 4));
+
+ List actual = inspectTableSchema(tableId);
+ List expected =
+ Arrays.asList(
+ "id | int | NO | true | null",
+ // 4 bytes per character instead of the default 3
+ "char | char(68) | YES | false | null",
+ "varchar | varchar(68) | YES | false | null");
+
+ assertEqualsInOrder(expected, actual);
+ }
+
@Test
void testStarRocksAddColumn() throws Exception {
TableId tableId =
@@ -426,6 +455,10 @@ public void testStarRocksDropTable() throws Exception {
}
private void runJobWithEvents(List events) throws Exception {
+ runJobWithEvents(events, new Configuration());
+ }
+
+ private void runJobWithEvents(List events, Configuration extraConfig) throws Exception {
DataStream stream = env.fromData(events, new EventTypeInfo()).setParallelism(1);
Configuration config =
@@ -434,6 +467,7 @@ private void runJobWithEvents(List events) throws Exception {
.set(JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl())
.set(USERNAME, StarRocksContainer.STARROCKS_USERNAME)
.set(PASSWORD, StarRocksContainer.STARROCKS_PASSWORD);
+ config.addAll(extraConfig);
DataSink starRocksSink = createStarRocksDataSink(config);
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
index cc99b3bc40d..e1caaba50af 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
@@ -194,6 +194,36 @@ void testToStarRocksTableWithProperties() {
assertThat(table.getProperties()).containsEntry("replication_num", "3");
}
+ @Test
+ void testToStarRocksTableWithUnicodeCharMaxBytes() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("name", DataTypes.VARCHAR(17))
+ .physicalColumn("code", DataTypes.CHAR(17))
+ .primaryKey("id")
+ .build();
+
+ TableId tableId = TableId.tableId("db", "table");
+ // unicode-char.max-bytes = 4 to fit utf8mb4 characters
+ TableCreateConfig config = new TableCreateConfig(null, Collections.emptyMap(), 4);
+
+ StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId, schema, config);
+
+ List columns = table.getColumns();
+ assertThat(columns).hasSize(3);
+
+ StarRocksColumn nameColumn = columns.get(1);
+ assertThat(nameColumn.getColumnName()).isEqualTo("name");
+ assertThat(nameColumn.getDataType()).isEqualTo(StarRocksUtils.VARCHAR);
+ assertThat(nameColumn.getColumnSize()).hasValue(68);
+
+ StarRocksColumn codeColumn = columns.get(2);
+ assertThat(codeColumn.getColumnName()).isEqualTo("code");
+ assertThat(codeColumn.getDataType()).isEqualTo(StarRocksUtils.CHAR);
+ assertThat(codeColumn.getColumnSize()).hasValue(68);
+ }
+
@Test
void testToStarRocksTableWithComment() {
Schema schema =