diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java index c6d4201256515..13a3019a5c760 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java @@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.TableChange.ModifyDefinitionQuery; import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler; import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus; +import org.apache.flink.table.catalog.TableChange.ModifyStartMode; import org.apache.flink.table.operations.ddl.AlterTableChangeOperation; import java.util.List; @@ -120,6 +121,9 @@ private static String toString(TableChange tableChange) { ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery) tableChange; return String.format( " MODIFY DEFINITION QUERY TO '%s'", definitionQuery.getDefinitionQuery()); + } else if (tableChange instanceof ModifyStartMode) { + ModifyStartMode startMode = (ModifyStartMode) tableChange; + return String.format(" MODIFY START_MODE TO '%s'", startMode.getStartMode()); } else { return AlterTableChangeOperation.toString(tableChange); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java index 8602812b6c3b8..54a55414a8e9c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java @@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.Column.MetadataColumn; +import org.apache.flink.table.catalog.StartMode; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.TableChange.AddColumn; import org.apache.flink.table.catalog.TableChange.AddDistribution; @@ -44,6 +45,7 @@ import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType; import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler; import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus; +import org.apache.flink.table.catalog.TableChange.ModifyStartMode; import org.apache.flink.table.catalog.TableChange.ModifyUniqueConstraint; import org.apache.flink.table.catalog.TableChange.ModifyWatermark; import org.apache.flink.table.catalog.TableChange.ResetOption; @@ -84,6 +86,7 @@ public class MaterializedTableChangeHandler { private int droppedPersistedCnt = 0; private String originalQuery; private String expandedQuery; + private StartMode startMode; private final Map options; private final List validationErrors = new ArrayList<>(); @@ -102,6 +105,7 @@ public MaterializedTableChangeHandler(CatalogMaterializedTable oldTable) { } originalQuery = oldTable.getOriginalQuery(); expandedQuery = oldTable.getExpandedQuery(); + startMode = oldTable.getStartMode().orElse(null); this.oldTable = oldTable; this.options = new HashMap<>(oldTable.getOptions()); } @@ -170,6 +174,7 @@ public static CatalogMaterializedTable buildNewMaterializedTable( .refreshStatus(context.getRefreshStatus()) .refreshHandlerDescription(context.getRefreshHandlerDesc()) .serializedRefreshHandler(context.getRefreshHandlerBytes()) + .startMode(context.getStartMode()) .build(); } @@ -222,6 +227,8 @@ private static HandlerRegistry createHandlerRegistry() { registry.register(SetOption.class, MaterializedTableChangeHandler::setTableOption); registry.register(ResetOption.class, MaterializedTableChangeHandler::resetTableOption); + registry.register(ModifyStartMode.class, MaterializedTableChangeHandler::modifyStartMode); + return registry; } @@ -284,6 +291,10 @@ public byte[] getRefreshHandlerBytes() { return refreshHandlerBytes; } + public StartMode getStartMode() { + return startMode; + } + @Nullable public String getRefreshHandlerDesc() { return refreshHandlerDesc; @@ -409,6 +420,10 @@ private void modifyRefreshStatus(ModifyRefreshStatus modifyRefreshStatus) { refreshStatus = modifyRefreshStatus.getRefreshStatus(); } + private void modifyStartMode(ModifyStartMode modifyStartMode) { + startMode = modifyStartMode.getStartMode(); + } + private void addDistribution(AddDistribution addDistribution) { distribution = addDistribution.getDistribution(); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java index b4a5084e9f60e..f05ebc1e089e7 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java @@ -25,7 +25,6 @@ import javax.annotation.Nullable; import java.time.Duration; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -243,8 +242,8 @@ class Builder { private Schema schema; private String comment; private TableDistribution distribution = null; - private List partitionKeys = Collections.emptyList(); - private Map options = Collections.emptyMap(); + private List partitionKeys = List.of(); + private Map options = Map.of(); private @Nullable Long snapshot; private String originalQuery; private String expandedQuery; @@ -254,6 +253,7 @@ class Builder { private RefreshStatus refreshStatus; private @Nullable String refreshHandlerDescription; private @Nullable byte[] serializedRefreshHandler; + private StartMode startMode; private Builder() {} @@ -341,6 +341,11 @@ public Builder distribution(@Nullable TableDistribution distribution) { return this; } + public Builder startMode(StartMode startMode) { + this.startMode = startMode; + return this; + } + public CatalogMaterializedTable build() { return new DefaultCatalogMaterializedTable( schema, @@ -356,7 +361,8 @@ public CatalogMaterializedTable build() { refreshMode, refreshStatus, refreshHandlerDescription, - serializedRefreshHandler); + serializedRefreshHandler, + startMode); } } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java index 78eca0ce89714..7b7439994fc0d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java @@ -52,6 +52,7 @@ public class DefaultCatalogMaterializedTable implements CatalogMaterializedTable private final RefreshStatus refreshStatus; private final @Nullable String refreshHandlerDescription; private final @Nullable byte[] serializedRefreshHandler; + private final @Nullable StartMode startMode; protected DefaultCatalogMaterializedTable( Schema schema, @@ -67,7 +68,8 @@ protected DefaultCatalogMaterializedTable( @Nullable RefreshMode refreshMode, RefreshStatus refreshStatus, @Nullable String refreshHandlerDescription, - @Nullable byte[] serializedRefreshHandler) { + @Nullable byte[] serializedRefreshHandler, + @Nullable StartMode startMode) { this.schema = checkNotNull(schema, "Schema must not be null."); this.comment = comment; this.distribution = distribution; @@ -83,6 +85,7 @@ protected DefaultCatalogMaterializedTable( this.refreshStatus = checkNotNull(refreshStatus, "Refresh status must not be null."); this.refreshHandlerDescription = refreshHandlerDescription; this.serializedRefreshHandler = serializedRefreshHandler; + this.startMode = startMode; checkArgument( options.entrySet().stream() @@ -136,7 +139,8 @@ public CatalogBaseTable copy() { refreshMode, refreshStatus, refreshHandlerDescription, - serializedRefreshHandler); + serializedRefreshHandler, + startMode); } @Override @@ -155,7 +159,8 @@ public CatalogMaterializedTable copy(Map options) { refreshMode, refreshStatus, refreshHandlerDescription, - serializedRefreshHandler); + serializedRefreshHandler, + startMode); } @Override @@ -177,7 +182,8 @@ public CatalogMaterializedTable copy( refreshMode, refreshStatus, refreshHandlerDescription, - serializedRefreshHandler); + serializedRefreshHandler, + startMode); } @Override @@ -225,6 +231,11 @@ public RefreshStatus getRefreshStatus() { return refreshStatus; } + @Override + public Optional getStartMode() { + return Optional.ofNullable(startMode); + } + @Override public Optional getRefreshHandlerDescription() { return Optional.ofNullable(refreshHandlerDescription); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java index 9c4d1969601ee..2d6d10bdc5385 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java @@ -21,6 +21,7 @@ import org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateMaterializedTable; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.config.MaterializedTableConfigOptions; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; @@ -29,6 +30,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.StartMode; import org.apache.flink.table.catalog.TableDistribution; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.planner.operations.PlannerQueryOperation; @@ -78,6 +80,8 @@ protected interface MergeContext { ResolvedSchema getMergedQuerySchema(); RefreshMode getMergedRefreshMode(); + + StartMode getMergedStartMode(); } protected abstract MergeContext getMergeContext( @@ -99,6 +103,17 @@ protected final IntervalFreshness getDerivedFreshness(T sqlCreateMaterializedTab .orElse(null); } + protected final StartMode getStartMode(T sqlCreateMaterializedTable, ConvertContext context) { + StartMode startMode = + MaterializedTableUtils.getStartMode(sqlCreateMaterializedTable.getStartMode()); + if (startMode != null) { + return startMode; + } + return StartMode.of( + context.getTableConfig() + .get(MaterializedTableConfigOptions.MATERIALIZED_TABLE_DEFAULT_START_MODE)); + } + protected final ResolvedSchema getQueryResolvedSchema( T sqlCreateMaterializedTable, ConvertContext context) { SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery(); @@ -160,6 +175,8 @@ protected final ResolvedCatalogMaterializedTable getResolvedCatalogMaterializedT final RefreshMode refreshMode = getDerivedRefreshMode(logicalRefreshMode); + final StartMode startMode = mergeContext.getMergedStartMode(); + return context.getCatalogManager() .resolveCatalogMaterializedTable( CatalogMaterializedTable.newBuilder() @@ -174,6 +191,7 @@ protected final ResolvedCatalogMaterializedTable getResolvedCatalogMaterializedT .logicalRefreshMode(logicalRefreshMode) .refreshMode(refreshMode) .refreshStatus(RefreshStatus.INITIALIZING) + .startMode(startMode) .build()); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java index 198a747406b06..0d81346262c83 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java @@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.SchemaResolver; +import org.apache.flink.table.catalog.StartMode; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.TableDistribution; import org.apache.flink.table.catalog.UniqueConstraint; @@ -137,6 +138,15 @@ private Function> buildTable throw new ValidationException("Changing of REFRESH MODE is unsupported"); } + final StartMode newStartMode = mergeContext.getMergedStartMode(); + final StartMode oldStartMode = + oldTable.getStartMode() + .orElseThrow( + () -> new ValidationException("START_MODE must not be null")); + if (!Objects.equals(oldStartMode, newStartMode)) { + changes.add(TableChange.modifyStartMode(newStartMode)); + } + return changes; }; } @@ -213,7 +223,7 @@ private List getSchemaTableChanges( private Optional getConstraintChange( final ResolvedSchema oldSchema, final ResolvedSchema newSchema, - boolean hasConstraintDefinition) { + final boolean hasConstraintDefinition) { final UniqueConstraint oldConstraint = oldSchema.getPrimaryKey().orElse(null); final UniqueConstraint newConstraint = newSchema.getPrimaryKey().orElse(null); if (hasConstraintDefinition && !Objects.equals(oldConstraint, newConstraint)) { @@ -279,11 +289,8 @@ public boolean hasSchemaDefinition() { @Override public boolean hasConstraintDefinition() { - if (!sqlCreateMaterializedTable.getTableConstraints().isEmpty()) { - return true; - } - - return hasSchemaDefinition(); + return !sqlCreateMaterializedTable.getTableConstraints().isEmpty() + || hasSchemaDefinition(); } @Override @@ -354,6 +361,11 @@ public RefreshMode getMergedRefreshMode() { return getDerivedRefreshMode( getDerivedLogicalRefreshMode(sqlCreateMaterializedTable)); } + + @Override + public StartMode getMergedStartMode() { + return getStartMode(sqlCreateMaterializedTable, context); + } }; } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java index 6f50a3d36e790..99fd660e8365f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java @@ -24,6 +24,8 @@ import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlMetadataColumn; import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn; import org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema; +import org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode; +import org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode.SqlStartModeKind; import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; @@ -36,6 +38,8 @@ import org.apache.flink.table.catalog.Interval.TimeUnit; import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.StartMode; +import org.apache.flink.table.catalog.StartMode.StartModeKind; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.TableChange.ColumnPosition; import org.apache.flink.table.planner.operations.PlannerQueryOperation; @@ -47,10 +51,13 @@ import org.apache.calcite.sql.SqlIntervalLiteral.IntervalValue; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlTimestampLiteral; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.TimestampString; import org.apache.commons.lang3.StringUtils; import java.math.BigDecimal; +import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -77,6 +84,51 @@ public static IntervalFreshness getMaterializedTableFreshness( return IntervalFreshness.of(getFreshnessInterval(sqlIntervalLiteral)); } + public static StartMode getStartMode(SqlStartMode sqlStartMode) { + if (sqlStartMode == null) { + return null; + } + + SqlStartModeKind sqlStartModeKind = sqlStartMode.getKind(); + StartModeKind startModeKind = deriveStartModeKind(sqlStartModeKind); + switch (sqlStartModeKind) { + case FROM_NOW: + case RESUME_OR_FROM_NOW: + SqlIntervalLiteral intervalLiteral = sqlStartMode.getIntervalLiteral(); + if (intervalLiteral == null) { + return StartMode.of(startModeKind, null, false); + } + + Interval interval = intervalFrom(intervalLiteral, "start mode"); + validateIntervalValuePositive(interval.getInterval(), "start mode"); + return StartMode.of(startModeKind, interval); + + case RESUME_OR_FROM_BEGINNING: + case FROM_BEGINNING: + return StartMode.of(startModeKind); + case RESUME_OR_FROM_TIMESTAMP: + case FROM_TIMESTAMP: + SqlTimestampLiteral timestampLiteral = sqlStartMode.getTimestampLiteral(); + if (timestampLiteral == null) { + return StartMode.of(startModeKind, null, false); + } + + TimestampString timestampString = + timestampLiteral.getValueAs(TimestampString.class); + SqlTypeName timestampTypeName = timestampLiteral.getTypeName(); + long millis = timestampString.getMillisSinceEpoch(); + Instant timestamp = Instant.ofEpochMilli(millis); + return StartMode.of( + startModeKind, + timestamp, + timestampTypeName == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE); + + default: + throw new ValidationException( + String.format("Unsupported start mode: %s.", sqlStartModeKind)); + } + } + private static Interval getFreshnessInterval(SqlIntervalLiteral sqlIntervalLiteral) { final IntervalValue intervalValue = sqlIntervalLiteral.getValueAs(IntervalValue.class); final SqlTypeName typeName = intervalValue.getIntervalQualifier().typeName(); @@ -303,6 +355,26 @@ private static boolean isDateTimeInterval(SqlTypeName typeName) { || typeName == SqlTypeName.INTERVAL_SECOND; } + private static StartModeKind deriveStartModeKind(SqlStartModeKind sqlStartModeKind) { + switch (sqlStartModeKind) { + case FROM_NOW: + return StartModeKind.FROM_NOW; + case RESUME_OR_FROM_NOW: + return StartModeKind.RESUME_OR_FROM_NOW; + case RESUME_OR_FROM_BEGINNING: + return StartModeKind.RESUME_OR_FROM_BEGINNING; + case FROM_BEGINNING: + return StartModeKind.FROM_BEGINNING; + case RESUME_OR_FROM_TIMESTAMP: + return StartModeKind.RESUME_OR_FROM_TIMESTAMP; + case FROM_TIMESTAMP: + return StartModeKind.FROM_TIMESTAMP; + default: + throw new ValidationException( + String.format("Unsupported start mode: %s.", sqlStartModeKind)); + } + } + // Since it is only for query change, then check only persisted columns which could be // changed/added/dropped with such change private static boolean isSchemaChanged(ResolvedSchema oldSchema, ResolvedSchema newSchema) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java index 9ba201687662b..4082919ef0251 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java @@ -23,7 +23,11 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.Interval; +import org.apache.flink.table.catalog.Interval.TimeUnit; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.StartMode; +import org.apache.flink.table.catalog.StartMode.StartModeKind; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.TableDistribution; import org.apache.flink.table.catalog.UniqueConstraint; @@ -37,8 +41,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; +import java.time.Instant; import java.util.Collection; import java.util.List; import java.util.Map; @@ -521,6 +528,123 @@ void testCreateOrAlterMaterializedTableWithCommentChange() + " MODIFY `t` COMMENT 'Timestamp Comment'"); } + @ParameterizedTest + @EnumSource( + value = StartModeKind.class, + names = { + "FROM_NOW", + "RESUME_OR_FROM_NOW", + "RESUME_OR_FROM_BEGINNING", + "FROM_BEGINNING" + }) + void testCreateOrAlterMaterializedTableWithNotChangedStartMode(StartModeKind kind) + throws TableAlreadyExistException, DatabaseNotExistException { + final String prepSql = + String.format( + "CREATE MATERIALIZED TABLE mt1\n" + + "START_MODE=%s\n" + + "AS SELECT * FROM t1", + kind.toString()); + createMaterializedTableInCatalog(prepSql, "mt1"); + + final String sql = + String.format( + "CREATE OR ALTER MATERIALIZED TABLE mt1\n" + + "START_MODE=%s\n" + + "AS SELECT * FROM t1", + kind); + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()).isEmpty(); + } + + @ParameterizedTest + @EnumSource( + value = StartModeKind.class, + names = { + "FROM_NOW", + "RESUME_OR_FROM_NOW", + "RESUME_OR_FROM_BEGINNING", + "FROM_BEGINNING" + }) + void testCreateOrAlterMaterializedTableWithFinalDefaultStartMode(StartModeKind kind) + throws TableAlreadyExistException, DatabaseNotExistException { + final String prepSql = + String.format( + "CREATE MATERIALIZED TABLE mt1\n" + + "START_MODE=%s\n" + + "AS SELECT * FROM t1", + kind.toString()); + createMaterializedTableInCatalog(prepSql, "mt1"); + + final String sql = "CREATE OR ALTER MATERIALIZED TABLE mt1\n" + "AS SELECT * FROM t1"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + if (kind == StartModeKind.FROM_BEGINNING) { + assertThat(op.getTableChanges()).isEmpty(); + } else { + assertThat(op.getTableChanges()) + .contains( + TableChange.modifyStartMode( + StartMode.of(StartModeKind.FROM_BEGINNING))); + } + } + + @ParameterizedTest + @MethodSource("startModeAlterCases") + void testCreateOrAlterMaterializedTableWithChangedStartMode( + String newStartModeClause, StartMode newStartMode) + throws TableAlreadyExistException, DatabaseNotExistException { + final String prepSql = + "CREATE MATERIALIZED TABLE mt1\n" + + "START_MODE=FROM_BEGINNING\n" + + "AS SELECT * FROM t1"; + createMaterializedTableInCatalog(prepSql, "mt1"); + + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt1\n" + + "START_MODE = " + + newStartModeClause + + "\n" + + "AS SELECT * FROM t1"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()).containsExactly(TableChange.modifyStartMode(newStartMode)); + } + + private static Collection startModeAlterCases() { + return List.of( + Arguments.of( + "RESUME_OR_FROM_BEGINNING", + StartMode.of(StartModeKind.RESUME_OR_FROM_BEGINNING)), + Arguments.of("FROM_NOW", StartMode.of(StartModeKind.FROM_NOW)), + Arguments.of( + "FROM_NOW(INTERVAL '2' HOUR)", + StartMode.of(StartModeKind.FROM_NOW, Interval.of(2, TimeUnit.HOUR))), + Arguments.of( + "FROM_NOW", + StartMode.of(StartModeKind.FROM_NOW), + "FROM_TIMESTAMP(TIMESTAMP '1234-12-10 11:22:33')", + StartMode.of( + StartModeKind.FROM_TIMESTAMP, + Instant.parse("1234-12-10T11:22:33Z"))), + Arguments.of("RESUME_OR_FROM_NOW", StartMode.of(StartModeKind.RESUME_OR_FROM_NOW)), + Arguments.of( + "RESUME_OR_FROM_TIMESTAMP(TIMESTAMP '2025-01-15 10:00:00')", + StartMode.of( + StartModeKind.RESUME_OR_FROM_TIMESTAMP, + Instant.parse("2025-01-15T10:00:00Z")))); + } + private void createMaterializedTableInCatalog(String sql, String materializedTableName) throws TableAlreadyExistException, DatabaseNotExistException { final ObjectPath objectPath =