diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java index 70f1f50f7eb..cc55d235a5e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java @@ -23,12 +23,15 @@ import com.starrocks.connector.flink.catalog.StarRocksCatalog; import com.starrocks.connector.flink.catalog.StarRocksCatalogException; import com.starrocks.connector.flink.catalog.StarRocksColumn; +import com.starrocks.connector.flink.catalog.StarRocksTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; /** An enriched {@code StarRocksCatalog} with more schema evolution abilities. */ public class StarRocksEnrichedCatalog extends StarRocksCatalog { @@ -38,6 +41,70 @@ public StarRocksEnrichedCatalog(String jdbcUrl, String username, String password private static final Logger LOG = LoggerFactory.getLogger(StarRocksEnrichedCatalog.class); + @Override + public void createTable(StarRocksTable table, boolean ignoreIfExists) + throws StarRocksCatalogException { + String createTableSql = buildCreateTableSql(table, ignoreIfExists); + try { + executeUpdateStatement(createTableSql); + LOG.info( + "Success to create table {}.{}, sql: {}", + table.getDatabaseName(), + table.getDatabaseName(), + createTableSql); + } catch (Exception e) { + LOG.error( + "Failed to create table {}.{}, sql: {}", + table.getDatabaseName(), + table.getDatabaseName(), + createTableSql, + e); + throw new StarRocksCatalogException( + String.format( + "Failed to create table %s.%s", + table.getDatabaseName(), table.getDatabaseName()), + e); + } + } + + @Override + public void alterAddColumns( + String databaseName, + String tableName, + List addColumns, + long timeoutSecond) + throws StarRocksCatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "database name cannot be null or empty."); + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tableName), + "table name cannot be null or empty."); + Preconditions.checkArgument(!addColumns.isEmpty(), "Added columns should not be empty."); + + String alterSql = + buildAlterAddColumnsSql(databaseName, tableName, addColumns, timeoutSecond); + try { + long startTimeMillis = System.currentTimeMillis(); + executeAlter(databaseName, tableName, alterSql, timeoutSecond); + LOG.info( + "Success to add columns to {}.{}, duration: {}ms, sql: {}", + databaseName, + tableName, + System.currentTimeMillis() - startTimeMillis, + alterSql); + } catch (Exception e) { + LOG.error( + "Failed to add columns to {}.{}, sql: {}", + databaseName, + tableName, + alterSql, + e); + throw new StarRocksCatalogException( + String.format("Failed to add columns to %s.%s ", databaseName, tableName), e); + } + } + public void truncateTable(String databaseName, String tableName) throws StarRocksCatalogException { checkTableArgument(databaseName, tableName); @@ -137,6 +204,80 @@ public void alterColumnType(String databaseName, String tableName, StarRocksColu } } + private String buildAlterAddColumnsSql( + String databaseName, + String tableName, + List addColumns, + long timeoutSecond) { + StringBuilder builder = new StringBuilder(); + builder.append(String.format("ALTER TABLE `%s`.`%s` ", databaseName, tableName)); + String columnsStmt = + addColumns.stream() + .map(col -> "ADD COLUMN " + buildColumnStmt(col)) + .collect(Collectors.joining(", ")); + builder.append(columnsStmt); + builder.append(String.format(" PROPERTIES (\"timeout\" = \"%s\")", timeoutSecond)); + builder.append(";"); + return builder.toString(); + } + + private String buildCreateTableSql(StarRocksTable table, boolean ignoreIfExists) { + StringBuilder builder = new StringBuilder(); + builder.append( + String.format( + "CREATE TABLE %s`%s`.`%s`", + ignoreIfExists ? "IF NOT EXISTS " : "", + table.getDatabaseName(), + table.getTableName())); + builder.append(" (\n"); + String columnsStmt = + table.getColumns().stream() + .map(this::buildColumnStmt) + .collect(Collectors.joining(",\n")); + builder.append(columnsStmt); + builder.append("\n) "); + + Preconditions.checkArgument( + table.getTableType() == StarRocksTable.TableType.PRIMARY_KEY, + "Not support to build create table sql for table type " + table.getTableType()); + Preconditions.checkArgument( + table.getTableKeys().isPresent(), + "Can't build create table sql because there is no table keys"); + String tableKeys = + table.getTableKeys().get().stream() + .map(key -> "`" + key + "`") + .collect(Collectors.joining(", ")); + builder.append(String.format("PRIMARY KEY (%s)\n", tableKeys)); + + Preconditions.checkArgument( + table.getDistributionKeys().isPresent(), + "Can't build create table sql because there is no distribution keys"); + String distributionKeys = + table.getDistributionKeys().get().stream() + .map(key -> "`" + key + "`") + .collect(Collectors.joining(", ")); + builder.append(String.format("DISTRIBUTED BY HASH (%s)", distributionKeys)); + if (table.getNumBuckets().isPresent()) { + builder.append(" BUCKETS "); + builder.append(table.getNumBuckets().get()); + } + if (!table.getProperties().isEmpty()) { + builder.append("\nPROPERTIES (\n"); + String properties = + table.getProperties().entrySet().stream() + .map( + entry -> + String.format( + "\"%s\" = \"%s\"", + entry.getKey(), entry.getValue())) + .collect(Collectors.joining(",\n")); + builder.append(properties); + builder.append("\n)"); + } + builder.append(";"); + return builder.toString(); + } + private String buildTruncateTableSql(String databaseName, String tableName) { return String.format("TRUNCATE TABLE `%s`.`%s`;", databaseName, tableName); } @@ -171,6 +312,26 @@ private void executeUpdateStatement(String sql) throws StarRocksCatalogException } } + private void executeAlter( + String databaseName, String tableName, String alterSql, long timeoutSecond) + throws StarRocksCatalogException { + try { + Method m = + getClass() + .getSuperclass() + .getDeclaredMethod( + "executeAlter", + String.class, + String.class, + String.class, + long.class); + m.setAccessible(true); + m.invoke(this, databaseName, tableName, alterSql, timeoutSecond); + } catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + private void checkTableArgument(String databaseName, String tableName) { Preconditions.checkArgument( !StringUtils.isNullOrWhitespaceOnly(databaseName), @@ -191,15 +352,25 @@ private String buildColumnStmt(StarRocksColumn column) { builder.append(" "); builder.append(column.isNullable() ? "NULL" : "NOT NULL"); if (column.getDefaultValue().isPresent()) { - builder.append(String.format(" DEFAULT \"%s\"", column.getDefaultValue().get())); + builder.append( + String.format( + " DEFAULT \"%s\"", + escapeForDoubleQuotedSqlString(column.getDefaultValue().get()))); } if (column.getColumnComment().isPresent()) { - builder.append(String.format(" COMMENT \"%s\"", column.getColumnComment().get())); + builder.append( + String.format( + " COMMENT \"%s\"", + escapeForDoubleQuotedSqlString(column.getColumnComment().get()))); } return builder.toString(); } + private String escapeForDoubleQuotedSqlString(String value) { + return value.replace("\\", "\\\\").replace("\"", "\\\""); + } + private String getFullColumnType( String type, Optional columnSize, Optional decimalDigits) { String dataType = type.toUpperCase(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java index fd1d3c06845..6c6c2fe3b83 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java @@ -101,7 +101,9 @@ private List generateAddColumnEvents(TableId tableId) { Schema.newBuilder() .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) - .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .column( + new PhysicalColumn( + "name", DataTypes.VARCHAR(17), "\"name\"", "\"\"")) .primaryKey("id") .build(); @@ -123,9 +125,16 @@ private List generateAddColumnEvents(TableId tableId) { Collections.singletonList( new AddColumnEvent.ColumnWithPosition( new PhysicalColumn( - "extra_decimal", - DataTypes.DECIMAL(17, 0), - null))))); + "extra_decimal", DataTypes.DECIMAL(17, 0), null)))), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "extra_string", + DataTypes.VARCHAR(17), + "\"extra_string\"", + "\"\""))))); } private List generateDropColumnEvents(TableId tableId) { @@ -285,10 +294,11 @@ void testStarRocksAddColumn() throws Exception { Arrays.asList( "id | int | NO | true | null", "number | double | YES | false | null", - "name | varchar(51) | YES | false | null", + "name | varchar(51) | YES | false | \"\"", "extra_date | date | YES | false | null", "extra_bool | boolean | YES | false | null", - "extra_decimal | decimal(17,0) | YES | false | null"); + "extra_decimal | decimal(17,0) | YES | false | null", + "extra_string | varchar(51) | YES | false | \"\""); assertEqualsInOrder(expected, actual); }