From 06aa0998cd2b24b9617cc72d60e45b0aec4593f1 Mon Sep 17 00:00:00 2001 From: SkylerLin <44233950+linguoxuan@users.noreply.github.com> Date: Tue, 7 Apr 2026 15:19:17 +0800 Subject: [PATCH] [FLINK-39373][pipeline][starrocks] Fix the error when modifying the default value of a column --- .../sink/StarRocksEnrichedCatalog.java | 57 +++++++++++++++++++ .../sink/StarRocksMetadataApplierITCase.java | 35 ++++++++++++ 2 files changed, 92 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java index 70f1f50f7eb..9bfde719d9e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java @@ -28,6 +28,9 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.util.Optional; /** An enriched {@code StarRocksCatalog} with more schema evolution abilities. */ @@ -112,6 +115,23 @@ public void alterColumnType(String databaseName, String tableName, StarRocksColu Preconditions.checkArgument( !StringUtils.isNullOrWhitespaceOnly(column.getColumnName()), "column name cannot be null or empty."); + if (column.getDefaultValue().isEmpty()) { + Optional existingDefault = + getColumnDefaultValue(databaseName, tableName, column.getColumnName()); + if (existingDefault.isPresent()) { + column = + new StarRocksColumn.Builder() + .setColumnName(column.getColumnName()) + .setOrdinalPosition(column.getOrdinalPosition()) + .setDataType(column.getDataType()) + .setNullable(column.isNullable()) + .setDefaultValue(existingDefault.get()) + .setColumnSize(column.getColumnSize().orElse(null)) + .setDecimalDigits(column.getDecimalDigits().orElse(null)) + .setColumnComment(column.getColumnComment().orElse(null)) + .build(); + } + } String alterSql = buildAlterColumnTypeSql(databaseName, tableName, buildColumnStmt(column)); try { long startTimeMillis = System.currentTimeMillis(); @@ -171,6 +191,43 @@ private void executeUpdateStatement(String sql) throws StarRocksCatalogException } } + private Optional getColumnDefaultValue( + String databaseName, String tableName, String columnName) { + String querySql = + "SELECT `COLUMN_DEFAULT` FROM `information_schema`.`COLUMNS` " + + "WHERE `TABLE_SCHEMA`=? AND `TABLE_NAME`=? AND `COLUMN_NAME`=?"; + try (Connection connection = getConnection(); + PreparedStatement statement = connection.prepareStatement(querySql)) { + statement.setObject(1, databaseName); + statement.setObject(2, tableName); + statement.setObject(3, columnName); + try (ResultSet resultSet = statement.executeQuery()) { + if (resultSet.next()) { + String defaultValue = resultSet.getString("COLUMN_DEFAULT"); + return Optional.ofNullable(defaultValue); + } + } + } catch (Exception e) { + LOG.warn( + "Failed to get column default value for {}.{}.{}", + databaseName, + tableName, + columnName, + e); + } + return Optional.empty(); + } + + private Connection getConnection() { + try { + Method m = getClass().getSuperclass().getDeclaredMethod("getConnection"); + m.setAccessible(true); + return (Connection) m.invoke(this); + } catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) { + throw new RuntimeException("Failed to get connection from StarRocksCatalog", e); + } + } + private void checkTableArgument(String databaseName, String tableName) { Preconditions.checkArgument( !StringUtils.isNullOrWhitespaceOnly(databaseName), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java index fd1d3c06845..ddb1b7e8f85 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java @@ -628,6 +628,41 @@ void testMysqlDefaultTimestampValueWithMicrosInCreateTable() throws Exception { assertEqualsInOrder(expected, actual); } + @Test + void testAlterColumnTypePreservesDefaultValue() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null, "unknown")) + .primaryKey("id") + .build(); + + List events = new ArrayList<>(); + events.add(new CreateTableEvent(tableId, schema)); + events.add( + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19)))); + + runJobWithEvents(events); + waitAlterDone(tableId, 60000L); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | int | NO | true | null", + "number | double | YES | false | null", + "name | varchar(57) | YES | false | unknown"); + + assertEqualsInOrder(expected, actual); + } + @Test void testMysqlDefaultTimestampValueWithMicrosInAddColumn() throws Exception { TableId tableId =