Skip to content

Commit 504a4b4

Browse files
haruki-830春栖claude
authored
[FLINK-39759][pipeline-connector/starrocks] Fix CHAR/VARCHAR mapping for utf8mb4 characters (#4447)
This closes #4447. Co-authored-by: 春栖 <chunxi.mjy@U-4KXDP7CK-0015.local> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent b451d9a commit 504a4b4

10 files changed

Lines changed: 243 additions & 40 deletions

File tree

docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,13 @@ pipeline:
215215
<td>Duration</td>
216216
<td>StarRocks 侧执行 schema change 的超时时间,必须是秒的整数倍。超时后 StarRocks 将会取消 schema change,从而导致作业失败。</td>
217217
</tr>
218+
<tr>
219+
<td>unicode-char.max-bytes</td>
220+
<td>optional</td>
221+
<td style="word-wrap: break-word;">3</td>
222+
<td>Integer</td>
223+
<td>将上游 CHAR 和 VARCHAR 类型映射到 StarRocks 时,为每个字符分配的最大字节数。由于 StarRocks 的长度以字节为单位,如果上游使用 utf8mb4,建议将该选项设置为 4,以避免低估目标列长度。默认值仍为 3,以保持向后兼容。</td>
224+
</tr>
218225
<tr>
219226
<td>sink.socket.timeout-ms</td>
220227
<td>optional</td>
@@ -320,22 +327,19 @@ pipeline:
320327
<td></td>
321328
</tr>
322329
<tr>
323-
<td>CHAR(n) where n <= 85</td>
324-
<td>CHAR(n * 3)</td>
325-
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks
326-
中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以只有当 CDC 中长度不超过85时,才将 CDC CHAR 映射到 StarRocks CHAR。</td>
330+
<td>CHAR(n),且 n * unicode-char.max-bytes <= 255,并且不是主键列</td>
331+
<td>CHAR(n * unicode-char.max-bytes)</td>
332+
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。StarRocks 的长度按 n * unicode-char.max-bytes 计算。由于 StarRocks CHAR 类型的最大长度为 255,只有当计算后的长度不超过 255 时,才将 CDC CHAR 映射到 StarRocks CHAR。如果该列是主键列,则会映射为 VARCHAR。</td>
327333
</tr>
328334
<tr>
329-
<td>CHAR(n) where n > 85</td>
330-
<td>VARCHAR(n * 3)</td>
331-
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks
332-
中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以当 CDC 中长度超过85时,才将 CDC CHAR 映射到 StarRocks VARCHAR。</td>
335+
<td>CHAR(n),且 n * unicode-char.max-bytes > 255,或主键列</td>
336+
<td>VARCHAR(min(n * unicode-char.max-bytes, 1048576))</td>
337+
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。StarRocks 的长度按 n * unicode-char.max-bytes 计算。由于 StarRocks CHAR 类型的最大长度为 255,当计算后的长度超过 255 时,会将 CDC CHAR 映射到 StarRocks VARCHAR。主键 CHAR 列也会映射为 VARCHAR。</td>
333338
</tr>
334339
<tr>
335340
<td>VARCHAR(n)</td>
336-
<td>VARCHAR(n * 3)</td>
337-
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks
338-
中为 n * 3。</td>
341+
<td>VARCHAR(min(n * unicode-char.max-bytes, 1048576))</td>
342+
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。StarRocks 的长度按 n * unicode-char.max-bytes 计算,并且最大不会超过 1048576。</td>
339343
</tr>
340344
<tr>
341345
<td>BINARY(n)</td>

docs/content/docs/connectors/pipeline-connectors/starrocks.md

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,13 @@ pipeline:
222222
seconds. StarRocks will cancel the schema change after timeout which will
223223
cause the sink failure. </td>
224224
</tr>
225+
<tr>
226+
<td>unicode-char.max-bytes</td>
227+
<td>optional</td>
228+
<td style="word-wrap: break-word;">3</td>
229+
<td>Integer</td>
230+
<td>The maximum number of bytes allocated for each upstream character when mapping CHAR and VARCHAR types to StarRocks, whose length is measured in bytes. If the upstream source uses utf8mb4, set this option to 4 to avoid underestimating column lengths. The default value of 3 is retained for backward compatibility.</td>
231+
</tr>
225232
<tr>
226233
<td>sink.socket.timeout-ms</td>
227234
<td>optional</td>
@@ -329,24 +336,19 @@ pipeline:
329336
<td></td>
330337
</tr>
331338
<tr>
332-
<td>CHAR(n) where n <= 85</td>
333-
<td>CHAR(n * 3)</td>
334-
<td>CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese
335-
character is equal to three bytes, so the length for StarRocks is n * 3. Because the max length of StarRocks
336-
CHAR is 255, map CDC CHAR to StarRocks CHAR only when the CDC length is no larger than 85.</td>
339+
<td>CHAR(n) where n * unicode-char.max-bytes <= 255 and not primary key</td>
340+
<td>CHAR(n * unicode-char.max-bytes)</td>
341+
<td>CDC defines the length by characters, and StarRocks defines it by bytes. The StarRocks length is calculated as n * unicode-char.max-bytes. Because the max length of StarRocks CHAR is 255, map CDC CHAR to StarRocks CHAR only when the calculated length is no larger than 255. If the column is part of the primary key, it is mapped to VARCHAR instead.</td>
337342
</tr>
338343
<tr>
339-
<td>CHAR(n) where n > 85</td>
340-
<td>VARCHAR(n * 3)</td>
341-
<td>CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese
342-
character is equal to three bytes, so the length for StarRocks is n * 3. Because the max length of StarRocks
343-
CHAR is 255, map CDC CHAR to StarRocks VARCHAR if the CDC length is larger than 85.</td>
344+
<td>CHAR(n) where n * unicode-char.max-bytes > 255, or primary key</td>
345+
<td>VARCHAR(min(n * unicode-char.max-bytes, 1048576))</td>
346+
<td>CDC defines the length by characters, and StarRocks defines it by bytes. The StarRocks length is calculated as n * unicode-char.max-bytes. Because the max length of StarRocks CHAR is 255, map CDC CHAR to StarRocks VARCHAR when the calculated length exceeds 255. Primary key CHAR columns are also mapped to VARCHAR.</td>
344347
</tr>
345348
<tr>
346349
<td>VARCHAR(n)</td>
347-
<td>VARCHAR(n * 3)</td>
348-
<td>CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese
349-
character is equal to three bytes, so the length for StarRocks is n * 3.</td>
350+
<td>VARCHAR(min(n * unicode-char.max-bytes, 1048576))</td>
351+
<td>CDC defines the length by characters, and StarRocks defines it by bytes. The StarRocks length is calculated as n * unicode-char.max-bytes and capped at 1048576.</td>
350352
</tr>
351353
<tr>
352354
<td>BINARY(n)</td>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ public Set<ConfigOption<?>> optionalOptions() {
182182
optionalOptions.add(StarRocksDataSinkOptions.SINK_METRIC_HISTOGRAM_WINDOW_SIZE);
183183
optionalOptions.add(StarRocksDataSinkOptions.TABLE_CREATE_NUM_BUCKETS);
184184
optionalOptions.add(StarRocksDataSinkOptions.TABLE_SCHEMA_CHANGE_TIMEOUT);
185+
optionalOptions.add(StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES);
185186
return optionalOptions;
186187
}
187188
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,4 +157,14 @@ public class StarRocksDataSinkOptions {
157157
"Timeout for a schema change on StarRocks side, and must be an integral multiple of "
158158
+ "seconds. StarRocks will cancel the schema change after timeout which will "
159159
+ "cause the sink failure.");
160+
161+
public static final ConfigOption<Integer> UNICODE_CHAR_MAX_BYTES =
162+
ConfigOptions.key("unicode-char.max-bytes")
163+
.intType()
164+
.defaultValue(3)
165+
.withDescription(
166+
"Specifies how many bytes are allocated for each upstream character when mapping "
167+
+ "CHAR and VARCHAR types to StarRocks, whose length is measured in bytes. "
168+
+ "Valid values are within [1, 4]. If the upstream source uses utf8mb4, "
169+
+ "set this option to 4 to avoid underestimating column lengths.");
160170
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private void applyAddColumn(AddColumnEvent addColumnEvent) throws SchemaEvolveEx
162162
.setDefaultValue(
163163
StarRocksUtils.convertInvalidTimestampDefaultValue(
164164
column.getDefaultValueExpression(), column.getType()));
165-
toStarRocksDataType(column, false, builder);
165+
toStarRocksDataType(column, false, builder, tableCreateConfig.getUnicodeCharMaxBytes());
166166
addColumns.add(builder.build());
167167
}
168168

@@ -321,7 +321,11 @@ private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolv
321321
for (Map.Entry<String, DataType> entry : typeMapping.entrySet()) {
322322
StarRocksColumn.Builder builder =
323323
new StarRocksColumn.Builder().setColumnName(entry.getKey());
324-
toStarRocksDataType(entry.getValue(), false, builder);
324+
toStarRocksDataType(
325+
entry.getValue(),
326+
false,
327+
builder,
328+
tableCreateConfig.getUnicodeCharMaxBytes());
325329
catalog.alterColumnType(
326330
tableId.getSchemaName(), tableId.getTableName(), builder.build());
327331
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,11 @@ public static StarRocksTable toStarRocksTable(
9595
.setDefaultValue(
9696
convertInvalidTimestampDefaultValue(
9797
column.getDefaultValueExpression(), column.getType()));
98-
toStarRocksDataType(column, i < primaryKeyCount, builder);
98+
toStarRocksDataType(
99+
column,
100+
i < primaryKeyCount,
101+
builder,
102+
tableCreateConfig.getUnicodeCharMaxBytes());
99103
starRocksColumns.add(builder.build());
100104
}
101105

@@ -119,14 +123,38 @@ public static StarRocksTable toStarRocksTable(
119123
/** Convert CDC data type to StarRocks data type. */
120124
public static void toStarRocksDataType(
121125
DataType cdcDataType, boolean isPrimaryKeys, StarRocksColumn.Builder builder) {
126+
toStarRocksDataType(
127+
cdcDataType,
128+
isPrimaryKeys,
129+
builder,
130+
StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
131+
}
132+
133+
public static void toStarRocksDataType(
134+
DataType cdcDataType,
135+
boolean isPrimaryKeys,
136+
StarRocksColumn.Builder builder,
137+
int unicodeCharMaxBytes) {
122138
CdcDataTypeTransformer dataTypeTransformer =
123-
new CdcDataTypeTransformer(isPrimaryKeys, builder);
139+
new CdcDataTypeTransformer(isPrimaryKeys, builder, unicodeCharMaxBytes);
124140
cdcDataType.accept(dataTypeTransformer);
125141
}
126142

127143
public static void toStarRocksDataType(
128144
Column cdcColumn, boolean isPrimaryKeys, StarRocksColumn.Builder builder) {
129-
toStarRocksDataType(cdcColumn.getType(), isPrimaryKeys, builder);
145+
toStarRocksDataType(
146+
cdcColumn,
147+
isPrimaryKeys,
148+
builder,
149+
StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
150+
}
151+
152+
public static void toStarRocksDataType(
153+
Column cdcColumn,
154+
boolean isPrimaryKeys,
155+
StarRocksColumn.Builder builder,
156+
int unicodeCharMaxBytes) {
157+
toStarRocksDataType(cdcColumn.getType(), isPrimaryKeys, builder, unicodeCharMaxBytes);
130158
}
131159

132160
/** Format DATE type data. */
@@ -297,10 +325,20 @@ public static class CdcDataTypeTransformer
297325

298326
private final StarRocksColumn.Builder builder;
299327
private final boolean isPrimaryKeys;
328+
private final int unicodeCharMaxBytes;
300329

301330
public CdcDataTypeTransformer(boolean isPrimaryKeys, StarRocksColumn.Builder builder) {
331+
this(
332+
isPrimaryKeys,
333+
builder,
334+
StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
335+
}
336+
337+
public CdcDataTypeTransformer(
338+
boolean isPrimaryKeys, StarRocksColumn.Builder builder, int unicodeCharMaxBytes) {
302339
this.isPrimaryKeys = isPrimaryKeys;
303340
this.builder = builder;
341+
this.unicodeCharMaxBytes = unicodeCharMaxBytes;
304342
}
305343

306344
@Override
@@ -379,13 +417,13 @@ public StarRocksColumn.Builder visit(DecimalType decimalType) {
379417
@Override
380418
public StarRocksColumn.Builder visit(CharType charType) {
381419
// CDC and StarRocks use different units for the length. It's the number
382-
// of characters in CDC, and the number of bytes in StarRocks. One chinese
383-
// character will use 3 bytes because it uses UTF-8, so the length of StarRocks
384-
// char type should be three times as that of CDC char type. Specifically, if
385-
// the length of StarRocks exceeds the MAX_CHAR_SIZE, map CDC char type to StarRocks
386-
// varchar type
420+
// of characters in CDC, and the number of bytes in StarRocks. The number
421+
// of bytes needed for each character depends on the upstream encoding, so
422+
// the length of StarRocks char type should be scaled by unicodeCharMaxBytes.
423+
// Specifically, if the length of StarRocks exceeds the MAX_CHAR_SIZE, map
424+
// CDC char type to StarRocks varchar type.
387425
int length = charType.getLength();
388-
long starRocksLength = length * 3L;
426+
long starRocksLength = (long) length * unicodeCharMaxBytes;
389427
// In the StarRocks, The primary key columns can be any of the following data types:
390428
// BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT, STRING, VARCHAR, DATE, and
391429
// DATETIME, But it doesn't include CHAR. When a char type appears in the primary key of
@@ -405,11 +443,11 @@ public StarRocksColumn.Builder visit(CharType charType) {
405443
@Override
406444
public StarRocksColumn.Builder visit(VarCharType varCharType) {
407445
// CDC and StarRocks use different units for the length. It's the number
408-
// of characters in CDC, and the number of bytes in StarRocks. One chinese
409-
// character will use 3 bytes because it uses UTF-8, so the length of StarRocks
410-
// varchar type should be three times as that of CDC varchar type.
446+
// of characters in CDC, and the number of bytes in StarRocks. The number
447+
// of bytes needed for each character depends on the upstream encoding, so
448+
// the length of StarRocks varchar type should be scaled by unicodeCharMaxBytes.
411449
int length = varCharType.getLength();
412-
long starRocksLength = length * 3L;
450+
long starRocksLength = (long) length * unicodeCharMaxBytes;
413451
builder.setDataType(VARCHAR);
414452
builder.setNullable(varCharType.isNullable());
415453
builder.setColumnSize((int) Math.min(starRocksLength, MAX_VARCHAR_SIZE));

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/TableCreateConfig.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.connectors.starrocks.sink;
1919

2020
import org.apache.flink.cdc.common.configuration.Configuration;
21+
import org.apache.flink.cdc.common.utils.Preconditions;
2122

2223
import javax.annotation.Nullable;
2324

@@ -44,9 +45,29 @@ public class TableCreateConfig implements Serializable {
4445
/** Properties for the table. */
4546
private final Map<String, String> properties;
4647

48+
/** Maximum number of bytes a single character can take in UTF-8 encoding. */
49+
public static final int MAX_UNICODE_CHAR_BYTES = 4;
50+
51+
/** Max bytes allocated for each upstream Unicode character during schema mapping. */
52+
private final int unicodeCharMaxBytes;
53+
4754
public TableCreateConfig(@Nullable Integer numBuckets, Map<String, String> properties) {
55+
this(
56+
numBuckets,
57+
properties,
58+
StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
59+
}
60+
61+
public TableCreateConfig(
62+
@Nullable Integer numBuckets, Map<String, String> properties, int unicodeCharMaxBytes) {
63+
Preconditions.checkArgument(
64+
unicodeCharMaxBytes >= 1 && unicodeCharMaxBytes <= MAX_UNICODE_CHAR_BYTES,
65+
"unicode-char.max-bytes must be between 1 and %s, but actually is %s",
66+
MAX_UNICODE_CHAR_BYTES,
67+
unicodeCharMaxBytes);
4868
this.numBuckets = numBuckets;
4969
this.properties = new HashMap<>(properties);
70+
this.unicodeCharMaxBytes = unicodeCharMaxBytes;
5071
}
5172

5273
public Optional<Integer> getNumBuckets() {
@@ -57,6 +78,10 @@ public Map<String, String> getProperties() {
5778
return Collections.unmodifiableMap(properties);
5879
}
5980

81+
public int getUnicodeCharMaxBytes() {
82+
return unicodeCharMaxBytes;
83+
}
84+
6085
public static TableCreateConfig from(Configuration config) {
6186
Integer numBuckets = config.get(StarRocksDataSinkOptions.TABLE_CREATE_NUM_BUCKETS);
6287
Map<String, String> tableProperties =
@@ -77,6 +102,9 @@ public static TableCreateConfig from(Configuration config) {
77102
.length())
78103
.toLowerCase(),
79104
Map.Entry::getValue));
80-
return new TableCreateConfig(numBuckets, tableProperties);
105+
return new TableCreateConfig(
106+
numBuckets,
107+
tableProperties,
108+
config.get(StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES));
81109
}
82110
}

0 commit comments

Comments
 (0)