Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ pipeline:
<td>Duration</td>
<td>StarRocks 侧执行 schema change 的超时时间,必须是秒的整数倍。超时后 StarRocks 将会取消 schema change,从而导致作业失败。</td>
</tr>
<tr>
<td>unicode-char.max-bytes</td>
<td>optional</td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>将上游 CHAR 和 VARCHAR 类型映射到 StarRocks 时,为每个字符分配的最大字节数。由于 StarRocks 的长度以字节为单位,如果上游使用 utf8mb4,建议将该选项设置为 4,以避免低估目标列长度。默认值仍为 3,以保持向后兼容。</td>
</tr>
<tr>
<td>sink.socket.timeout-ms</td>
<td>optional</td>
Expand Down Expand Up @@ -320,22 +327,19 @@ pipeline:
<td></td>
</tr>
<tr>
<td>CHAR(n) where n <= 85</td>
<td>CHAR(n * 3)</td>
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks
中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以只有当 CDC 中长度不超过85时,才将 CDC CHAR 映射到 StarRocks CHAR。</td>
<td>CHAR(n),且 n * unicode-char.max-bytes <= 255,并且不是主键列</td>
<td>CHAR(n * unicode-char.max-bytes)</td>
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。StarRocks 的长度按 n * unicode-char.max-bytes 计算。由于 StarRocks CHAR 类型的最大长度为 255,只有当计算后的长度不超过 255 时,才将 CDC CHAR 映射到 StarRocks CHAR。如果该列是主键列,则会映射为 VARCHAR。</td>
</tr>
<tr>
<td>CHAR(n) where n > 85</td>
<td>VARCHAR(n * 3)</td>
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks
中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以当 CDC 中长度超过85时,才将 CDC CHAR 映射到 StarRocks VARCHAR。</td>
<td>CHAR(n),且 n * unicode-char.max-bytes > 255,或主键列</td>
<td>VARCHAR(min(n * unicode-char.max-bytes, 1048576))</td>
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。StarRocks 的长度按 n * unicode-char.max-bytes 计算。由于 StarRocks CHAR 类型的最大长度为 255,当计算后的长度超过 255 时,会将 CDC CHAR 映射到 StarRocks VARCHAR。主键 CHAR 列也会映射为 VARCHAR。</td>
</tr>
<tr>
<td>VARCHAR(n)</td>
<td>VARCHAR(n * 3)</td>
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks
中为 n * 3。</td>
<td>VARCHAR(min(n * unicode-char.max-bytes, 1048576))</td>
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。StarRocks 的长度按 n * unicode-char.max-bytes 计算,并且最大不会超过 1048576。</td>
</tr>
<tr>
<td>BINARY(n)</td>
Expand Down
28 changes: 15 additions & 13 deletions docs/content/docs/connectors/pipeline-connectors/starrocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,13 @@ pipeline:
seconds. StarRocks will cancel the schema change after timeout which will
cause the sink failure. </td>
</tr>
<tr>
<td>unicode-char.max-bytes</td>
<td>optional</td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>The maximum number of bytes allocated for each upstream character when mapping CHAR and VARCHAR types to StarRocks, whose length is measured in bytes. If the upstream source uses utf8mb4, set this option to 4 to avoid underestimating column lengths. The default value of 3 is retained for backward compatibility.</td>
</tr>
<tr>
<td>sink.socket.timeout-ms</td>
<td>optional</td>
Expand Down Expand Up @@ -329,24 +336,19 @@ pipeline:
<td></td>
</tr>
<tr>
<td>CHAR(n) where n <= 85</td>
<td>CHAR(n * 3)</td>
<td>CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese
character is equal to three bytes, so the length for StarRocks is n * 3. Because the max length of StarRocks
CHAR is 255, map CDC CHAR to StarRocks CHAR only when the CDC length is no larger than 85.</td>
<td>CHAR(n) where n * unicode-char.max-bytes <= 255 and not primary key</td>
<td>CHAR(n * unicode-char.max-bytes)</td>
<td>CDC defines the length by characters, and StarRocks defines it by bytes. The StarRocks length is calculated as n * unicode-char.max-bytes. Because the max length of StarRocks CHAR is 255, map CDC CHAR to StarRocks CHAR only when the calculated length is no larger than 255. If the column is part of the primary key, it is mapped to VARCHAR instead.</td>
</tr>
<tr>
<td>CHAR(n) where n > 85</td>
<td>VARCHAR(n * 3)</td>
<td>CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese
character is equal to three bytes, so the length for StarRocks is n * 3. Because the max length of StarRocks
CHAR is 255, map CDC CHAR to StarRocks VARCHAR if the CDC length is larger than 85.</td>
<td>CHAR(n) where n * unicode-char.max-bytes > 255, or primary key</td>
<td>VARCHAR(min(n * unicode-char.max-bytes, 1048576))</td>
<td>CDC defines the length by characters, and StarRocks defines it by bytes. The StarRocks length is calculated as n * unicode-char.max-bytes. Because the max length of StarRocks CHAR is 255, map CDC CHAR to StarRocks VARCHAR when the calculated length exceeds 255. Primary key CHAR columns are also mapped to VARCHAR.</td>
</tr>
<tr>
<td>VARCHAR(n)</td>
<td>VARCHAR(n * 3)</td>
<td>CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese
character is equal to three bytes, so the length for StarRocks is n * 3.</td>
<td>VARCHAR(min(n * unicode-char.max-bytes, 1048576))</td>
<td>CDC defines the length by characters, and StarRocks defines it by bytes. The StarRocks length is calculated as n * unicode-char.max-bytes and capped at 1048576.</td>
</tr>
<tr>
<td>BINARY(n)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(StarRocksDataSinkOptions.SINK_METRIC_HISTOGRAM_WINDOW_SIZE);
optionalOptions.add(StarRocksDataSinkOptions.TABLE_CREATE_NUM_BUCKETS);
optionalOptions.add(StarRocksDataSinkOptions.TABLE_SCHEMA_CHANGE_TIMEOUT);
optionalOptions.add(StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES);
return optionalOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,14 @@ public class StarRocksDataSinkOptions {
"Timeout for a schema change on StarRocks side, and must be an integral multiple of "
+ "seconds. StarRocks will cancel the schema change after timeout which will "
+ "cause the sink failure.");

public static final ConfigOption<Integer> UNICODE_CHAR_MAX_BYTES =
ConfigOptions.key("unicode-char.max-bytes")
.intType()
.defaultValue(3)
.withDescription(
"Specifies how many bytes are allocated for each upstream character when mapping "
+ "CHAR and VARCHAR types to StarRocks, whose length is measured in bytes. "
+ "Valid values are within [1, 4]. If the upstream source uses utf8mb4, "
+ "set this option to 4 to avoid underestimating column lengths.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void applyAddColumn(AddColumnEvent addColumnEvent) throws SchemaEvolveEx
.setDefaultValue(
StarRocksUtils.convertInvalidTimestampDefaultValue(
column.getDefaultValueExpression(), column.getType()));
toStarRocksDataType(column, false, builder);
toStarRocksDataType(column, false, builder, tableCreateConfig.getUnicodeCharMaxBytes());
addColumns.add(builder.build());
}

Expand Down Expand Up @@ -321,7 +321,11 @@ private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolv
for (Map.Entry<String, DataType> entry : typeMapping.entrySet()) {
StarRocksColumn.Builder builder =
new StarRocksColumn.Builder().setColumnName(entry.getKey());
toStarRocksDataType(entry.getValue(), false, builder);
toStarRocksDataType(
entry.getValue(),
false,
builder,
tableCreateConfig.getUnicodeCharMaxBytes());
catalog.alterColumnType(
tableId.getSchemaName(), tableId.getTableName(), builder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ public static StarRocksTable toStarRocksTable(
.setDefaultValue(
convertInvalidTimestampDefaultValue(
column.getDefaultValueExpression(), column.getType()));
toStarRocksDataType(column, i < primaryKeyCount, builder);
toStarRocksDataType(
column,
i < primaryKeyCount,
builder,
tableCreateConfig.getUnicodeCharMaxBytes());
starRocksColumns.add(builder.build());
}

Expand All @@ -119,14 +123,38 @@ public static StarRocksTable toStarRocksTable(
/** Convert CDC data type to StarRocks data type. */
public static void toStarRocksDataType(
DataType cdcDataType, boolean isPrimaryKeys, StarRocksColumn.Builder builder) {
toStarRocksDataType(
cdcDataType,
isPrimaryKeys,
builder,
StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
}

public static void toStarRocksDataType(
DataType cdcDataType,
boolean isPrimaryKeys,
StarRocksColumn.Builder builder,
int unicodeCharMaxBytes) {
CdcDataTypeTransformer dataTypeTransformer =
new CdcDataTypeTransformer(isPrimaryKeys, builder);
new CdcDataTypeTransformer(isPrimaryKeys, builder, unicodeCharMaxBytes);
cdcDataType.accept(dataTypeTransformer);
}

public static void toStarRocksDataType(
Column cdcColumn, boolean isPrimaryKeys, StarRocksColumn.Builder builder) {
toStarRocksDataType(cdcColumn.getType(), isPrimaryKeys, builder);
toStarRocksDataType(
cdcColumn,
isPrimaryKeys,
builder,
StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
}

public static void toStarRocksDataType(
Column cdcColumn,
boolean isPrimaryKeys,
StarRocksColumn.Builder builder,
int unicodeCharMaxBytes) {
toStarRocksDataType(cdcColumn.getType(), isPrimaryKeys, builder, unicodeCharMaxBytes);
}

/** Format DATE type data. */
Expand Down Expand Up @@ -297,10 +325,20 @@ public static class CdcDataTypeTransformer

private final StarRocksColumn.Builder builder;
private final boolean isPrimaryKeys;
private final int unicodeCharMaxBytes;

public CdcDataTypeTransformer(boolean isPrimaryKeys, StarRocksColumn.Builder builder) {
this(
isPrimaryKeys,
builder,
StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
}

public CdcDataTypeTransformer(
boolean isPrimaryKeys, StarRocksColumn.Builder builder, int unicodeCharMaxBytes) {
this.isPrimaryKeys = isPrimaryKeys;
this.builder = builder;
this.unicodeCharMaxBytes = unicodeCharMaxBytes;
}

@Override
Expand Down Expand Up @@ -379,13 +417,13 @@ public StarRocksColumn.Builder visit(DecimalType decimalType) {
@Override
public StarRocksColumn.Builder visit(CharType charType) {
// CDC and StarRocks use different units for the length. It's the number
// of characters in CDC, and the number of bytes in StarRocks. One chinese
// character will use 3 bytes because it uses UTF-8, so the length of StarRocks
// char type should be three times as that of CDC char type. Specifically, if
// the length of StarRocks exceeds the MAX_CHAR_SIZE, map CDC char type to StarRocks
// varchar type
// of characters in CDC, and the number of bytes in StarRocks. The number
// of bytes needed for each character depends on the upstream encoding, so
// the length of StarRocks char type should be scaled by unicodeCharMaxBytes.
// Specifically, if the length of StarRocks exceeds the MAX_CHAR_SIZE, map
// CDC char type to StarRocks varchar type.
int length = charType.getLength();
long starRocksLength = length * 3L;
long starRocksLength = (long) length * unicodeCharMaxBytes;
// In the StarRocks, The primary key columns can be any of the following data types:
// BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT, STRING, VARCHAR, DATE, and
// DATETIME, But it doesn't include CHAR. When a char type appears in the primary key of
Expand All @@ -405,11 +443,11 @@ public StarRocksColumn.Builder visit(CharType charType) {
@Override
public StarRocksColumn.Builder visit(VarCharType varCharType) {
// CDC and StarRocks use different units for the length. It's the number
// of characters in CDC, and the number of bytes in StarRocks. One chinese
// character will use 3 bytes because it uses UTF-8, so the length of StarRocks
// varchar type should be three times as that of CDC varchar type.
// of characters in CDC, and the number of bytes in StarRocks. The number
// of bytes needed for each character depends on the upstream encoding, so
// the length of StarRocks varchar type should be scaled by unicodeCharMaxBytes.
int length = varCharType.getLength();
long starRocksLength = length * 3L;
long starRocksLength = (long) length * unicodeCharMaxBytes;
builder.setDataType(VARCHAR);
builder.setNullable(varCharType.isNullable());
builder.setColumnSize((int) Math.min(starRocksLength, MAX_VARCHAR_SIZE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.starrocks.sink;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.utils.Preconditions;

import javax.annotation.Nullable;

Expand All @@ -44,9 +45,29 @@ public class TableCreateConfig implements Serializable {
/** Properties for the table. */
private final Map<String, String> properties;

/** Maximum number of bytes a single character can take in UTF-8 encoding. */
public static final int MAX_UNICODE_CHAR_BYTES = 4;

/** Max bytes allocated for each upstream Unicode character during schema mapping. */
private final int unicodeCharMaxBytes;

public TableCreateConfig(@Nullable Integer numBuckets, Map<String, String> properties) {
this(
numBuckets,
properties,
StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
}

public TableCreateConfig(
@Nullable Integer numBuckets, Map<String, String> properties, int unicodeCharMaxBytes) {
Preconditions.checkArgument(
unicodeCharMaxBytes >= 1 && unicodeCharMaxBytes <= MAX_UNICODE_CHAR_BYTES,
"unicode-char.max-bytes must be between 1 and %s, but actually is %s",
MAX_UNICODE_CHAR_BYTES,
unicodeCharMaxBytes);
this.numBuckets = numBuckets;
this.properties = new HashMap<>(properties);
this.unicodeCharMaxBytes = unicodeCharMaxBytes;
Comment on lines +61 to +70
}

public Optional<Integer> getNumBuckets() {
Expand All @@ -57,6 +78,10 @@ public Map<String, String> getProperties() {
return Collections.unmodifiableMap(properties);
}

public int getUnicodeCharMaxBytes() {
return unicodeCharMaxBytes;
}

public static TableCreateConfig from(Configuration config) {
Integer numBuckets = config.get(StarRocksDataSinkOptions.TABLE_CREATE_NUM_BUCKETS);
Map<String, String> tableProperties =
Expand All @@ -77,6 +102,9 @@ public static TableCreateConfig from(Configuration config) {
.length())
.toLowerCase(),
Map.Entry::getValue));
return new TableCreateConfig(numBuckets, tableProperties);
return new TableCreateConfig(
numBuckets,
tableProperties,
config.get(StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES));
}
}
Loading
Loading