Skip to content

Commit ce261ba

Browse files
committed
[FLINK-39833][pipeline][starrocks] Escape special characters in column comment and default value when building DDL
1 parent 7355d76 commit ce261ba

3 files changed

Lines changed: 46 additions & 9 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,18 @@ private String buildColumnStmt(StarRocksColumn column) {
191191
builder.append(" ");
192192
builder.append(column.isNullable() ? "NULL" : "NOT NULL");
193193
if (column.getDefaultValue().isPresent()) {
194-
builder.append(String.format(" DEFAULT \"%s\"", column.getDefaultValue().get()));
194+
builder.append(
195+
String.format(
196+
" DEFAULT \"%s\"",
197+
StarRocksUtils.escapeSqlStringLiteral(column.getDefaultValue().get())));
195198
}
196199

197200
if (column.getColumnComment().isPresent()) {
198-
builder.append(String.format(" COMMENT \"%s\"", column.getColumnComment().get()));
201+
builder.append(
202+
String.format(
203+
" COMMENT \"%s\"",
204+
StarRocksUtils.escapeSqlStringLiteral(
205+
column.getColumnComment().get())));
199206
}
200207
return builder.toString();
201208
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,13 @@ private void applyAddColumn(AddColumnEvent addColumnEvent) throws SchemaEvolveEx
158158
new StarRocksColumn.Builder()
159159
.setColumnName(column.getName())
160160
.setOrdinalPosition(-1)
161-
.setColumnComment(column.getComment())
161+
.setColumnComment(
162+
StarRocksUtils.escapeSqlStringLiteral(column.getComment()))
162163
.setDefaultValue(
163-
StarRocksUtils.convertInvalidTimestampDefaultValue(
164-
column.getDefaultValueExpression(), column.getType()));
164+
StarRocksUtils.escapeSqlStringLiteral(
165+
StarRocksUtils.convertInvalidTimestampDefaultValue(
166+
column.getDefaultValueExpression(),
167+
column.getType())));
165168
toStarRocksDataType(column, false, builder);
166169
addColumns.add(builder.build());
167170
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,12 @@ public static StarRocksTable toStarRocksTable(
9191
new StarRocksColumn.Builder()
9292
.setColumnName(column.getName())
9393
.setOrdinalPosition(i)
94-
.setColumnComment(column.getComment())
94+
.setColumnComment(escapeSqlStringLiteral(column.getComment()))
9595
.setDefaultValue(
96-
convertInvalidTimestampDefaultValue(
97-
column.getDefaultValueExpression(), column.getType()));
96+
escapeSqlStringLiteral(
97+
convertInvalidTimestampDefaultValue(
98+
column.getDefaultValueExpression(),
99+
column.getType())));
98100
toStarRocksDataType(column, i < primaryKeyCount, builder);
99101
starRocksColumns.add(builder.build());
100102
}
@@ -108,7 +110,7 @@ public static StarRocksTable toStarRocksTable(
108110
.setTableKeys(schema.primaryKeys())
109111
// use primary keys as distribution keys by default
110112
.setDistributionKeys(schema.primaryKeys())
111-
.setComment(schema.comment());
113+
.setComment(escapeSqlStringLiteral(schema.comment()));
112114
if (tableCreateConfig.getNumBuckets().isPresent()) {
113115
tableBuilder.setNumBuckets(tableCreateConfig.getNumBuckets().get());
114116
}
@@ -492,4 +494,29 @@ public static String convertInvalidTimestampDefaultValue(
492494

493495
return defaultValue;
494496
}
497+
498+
public static String escapeSqlStringLiteral(String value) {
499+
if (value == null) {
500+
return null;
501+
}
502+
StringBuilder sb = new StringBuilder(value.length() + 8);
503+
for (int i = 0; i < value.length(); i++) {
504+
char c = value.charAt(i);
505+
switch (c) {
506+
case '\\':
507+
sb.append("\\\\");
508+
break;
509+
case '"':
510+
sb.append("\"\"");
511+
break;
512+
case '\n':
513+
case '\r':
514+
sb.append(' ');
515+
break;
516+
default:
517+
sb.append(c);
518+
}
519+
}
520+
return sb.toString();
521+
}
495522
}

0 commit comments

Comments
 (0)