Skip to content

Commit a014737

Browse files
authored
[FLINK-39303][table] Support START_MODE for CREATE path for MATERIALIZED TABLE
1 parent 3e8a585 commit a014737

8 files changed

Lines changed: 276 additions & 14 deletions

File tree

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.table.catalog.TableChange.ModifyDefinitionQuery;
2929
import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
3030
import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
31+
import org.apache.flink.table.catalog.TableChange.ModifyStartMode;
3132
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
3233

3334
import java.util.List;
@@ -120,6 +121,9 @@ private static String toString(TableChange tableChange) {
120121
ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery) tableChange;
121122
return String.format(
122123
" MODIFY DEFINITION QUERY TO '%s'", definitionQuery.getDefinitionQuery());
124+
} else if (tableChange instanceof ModifyStartMode) {
125+
ModifyStartMode startMode = (ModifyStartMode) tableChange;
126+
return String.format(" MODIFY START_MODE TO '%s'", startMode.getStartMode());
123127
} else {
124128
return AlterTableChangeOperation.toString(tableChange);
125129
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.table.catalog.CatalogMaterializedTable;
2828
import org.apache.flink.table.catalog.Column;
2929
import org.apache.flink.table.catalog.Column.MetadataColumn;
30+
import org.apache.flink.table.catalog.StartMode;
3031
import org.apache.flink.table.catalog.TableChange;
3132
import org.apache.flink.table.catalog.TableChange.AddColumn;
3233
import org.apache.flink.table.catalog.TableChange.AddDistribution;
@@ -44,6 +45,7 @@
4445
import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType;
4546
import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
4647
import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
48+
import org.apache.flink.table.catalog.TableChange.ModifyStartMode;
4749
import org.apache.flink.table.catalog.TableChange.ModifyUniqueConstraint;
4850
import org.apache.flink.table.catalog.TableChange.ModifyWatermark;
4951
import org.apache.flink.table.catalog.TableChange.ResetOption;
@@ -84,6 +86,7 @@ public class MaterializedTableChangeHandler {
8486
private int droppedPersistedCnt = 0;
8587
private String originalQuery;
8688
private String expandedQuery;
89+
private StartMode startMode;
8790
private final Map<String, String> options;
8891
private final List<String> validationErrors = new ArrayList<>();
8992

@@ -102,6 +105,7 @@ public MaterializedTableChangeHandler(CatalogMaterializedTable oldTable) {
102105
}
103106
originalQuery = oldTable.getOriginalQuery();
104107
expandedQuery = oldTable.getExpandedQuery();
108+
startMode = oldTable.getStartMode().orElse(null);
105109
this.oldTable = oldTable;
106110
this.options = new HashMap<>(oldTable.getOptions());
107111
}
@@ -170,6 +174,7 @@ public static CatalogMaterializedTable buildNewMaterializedTable(
170174
.refreshStatus(context.getRefreshStatus())
171175
.refreshHandlerDescription(context.getRefreshHandlerDesc())
172176
.serializedRefreshHandler(context.getRefreshHandlerBytes())
177+
.startMode(context.getStartMode())
173178
.build();
174179
}
175180

@@ -222,6 +227,8 @@ private static HandlerRegistry createHandlerRegistry() {
222227
registry.register(SetOption.class, MaterializedTableChangeHandler::setTableOption);
223228
registry.register(ResetOption.class, MaterializedTableChangeHandler::resetTableOption);
224229

230+
registry.register(ModifyStartMode.class, MaterializedTableChangeHandler::modifyStartMode);
231+
225232
return registry;
226233
}
227234

@@ -284,6 +291,10 @@ public byte[] getRefreshHandlerBytes() {
284291
return refreshHandlerBytes;
285292
}
286293

294+
public StartMode getStartMode() {
295+
return startMode;
296+
}
297+
287298
@Nullable
288299
public String getRefreshHandlerDesc() {
289300
return refreshHandlerDesc;
@@ -409,6 +420,10 @@ private void modifyRefreshStatus(ModifyRefreshStatus modifyRefreshStatus) {
409420
refreshStatus = modifyRefreshStatus.getRefreshStatus();
410421
}
411422

423+
private void modifyStartMode(ModifyStartMode modifyStartMode) {
424+
startMode = modifyStartMode.getStartMode();
425+
}
426+
412427
private void addDistribution(AddDistribution addDistribution) {
413428
distribution = addDistribution.getDistribution();
414429
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import javax.annotation.Nullable;
2626

2727
import java.time.Duration;
28-
import java.util.Collections;
2928
import java.util.List;
3029
import java.util.Map;
3130
import java.util.Optional;
@@ -243,8 +242,8 @@ class Builder {
243242
private Schema schema;
244243
private String comment;
245244
private TableDistribution distribution = null;
246-
private List<String> partitionKeys = Collections.emptyList();
247-
private Map<String, String> options = Collections.emptyMap();
245+
private List<String> partitionKeys = List.of();
246+
private Map<String, String> options = Map.of();
248247
private @Nullable Long snapshot;
249248
private String originalQuery;
250249
private String expandedQuery;
@@ -254,6 +253,7 @@ class Builder {
254253
private RefreshStatus refreshStatus;
255254
private @Nullable String refreshHandlerDescription;
256255
private @Nullable byte[] serializedRefreshHandler;
256+
private StartMode startMode;
257257

258258
private Builder() {}
259259

@@ -341,6 +341,11 @@ public Builder distribution(@Nullable TableDistribution distribution) {
341341
return this;
342342
}
343343

344+
public Builder startMode(StartMode startMode) {
345+
this.startMode = startMode;
346+
return this;
347+
}
348+
344349
public CatalogMaterializedTable build() {
345350
return new DefaultCatalogMaterializedTable(
346351
schema,
@@ -356,7 +361,8 @@ public CatalogMaterializedTable build() {
356361
refreshMode,
357362
refreshStatus,
358363
refreshHandlerDescription,
359-
serializedRefreshHandler);
364+
serializedRefreshHandler,
365+
startMode);
360366
}
361367
}
362368
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class DefaultCatalogMaterializedTable implements CatalogMaterializedTable
5252
private final RefreshStatus refreshStatus;
5353
private final @Nullable String refreshHandlerDescription;
5454
private final @Nullable byte[] serializedRefreshHandler;
55+
private final @Nullable StartMode startMode;
5556

5657
protected DefaultCatalogMaterializedTable(
5758
Schema schema,
@@ -67,7 +68,8 @@ protected DefaultCatalogMaterializedTable(
6768
@Nullable RefreshMode refreshMode,
6869
RefreshStatus refreshStatus,
6970
@Nullable String refreshHandlerDescription,
70-
@Nullable byte[] serializedRefreshHandler) {
71+
@Nullable byte[] serializedRefreshHandler,
72+
@Nullable StartMode startMode) {
7173
this.schema = checkNotNull(schema, "Schema must not be null.");
7274
this.comment = comment;
7375
this.distribution = distribution;
@@ -83,6 +85,7 @@ protected DefaultCatalogMaterializedTable(
8385
this.refreshStatus = checkNotNull(refreshStatus, "Refresh status must not be null.");
8486
this.refreshHandlerDescription = refreshHandlerDescription;
8587
this.serializedRefreshHandler = serializedRefreshHandler;
88+
this.startMode = startMode;
8689

8790
checkArgument(
8891
options.entrySet().stream()
@@ -136,7 +139,8 @@ public CatalogBaseTable copy() {
136139
refreshMode,
137140
refreshStatus,
138141
refreshHandlerDescription,
139-
serializedRefreshHandler);
142+
serializedRefreshHandler,
143+
startMode);
140144
}
141145

142146
@Override
@@ -155,7 +159,8 @@ public CatalogMaterializedTable copy(Map<String, String> options) {
155159
refreshMode,
156160
refreshStatus,
157161
refreshHandlerDescription,
158-
serializedRefreshHandler);
162+
serializedRefreshHandler,
163+
startMode);
159164
}
160165

161166
@Override
@@ -177,7 +182,8 @@ public CatalogMaterializedTable copy(
177182
refreshMode,
178183
refreshStatus,
179184
refreshHandlerDescription,
180-
serializedRefreshHandler);
185+
serializedRefreshHandler,
186+
startMode);
181187
}
182188

183189
@Override
@@ -225,6 +231,11 @@ public RefreshStatus getRefreshStatus() {
225231
return refreshStatus;
226232
}
227233

234+
@Override
235+
public Optional<StartMode> getStartMode() {
236+
return Optional.ofNullable(startMode);
237+
}
238+
228239
@Override
229240
public Optional<String> getRefreshHandlerDescription() {
230241
return Optional.ofNullable(refreshHandlerDescription);

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateMaterializedTable;
2222
import org.apache.flink.table.api.Schema;
2323
import org.apache.flink.table.api.ValidationException;
24+
import org.apache.flink.table.api.config.MaterializedTableConfigOptions;
2425
import org.apache.flink.table.catalog.CatalogMaterializedTable;
2526
import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
2627
import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
@@ -29,6 +30,7 @@
2930
import org.apache.flink.table.catalog.ObjectIdentifier;
3031
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
3132
import org.apache.flink.table.catalog.ResolvedSchema;
33+
import org.apache.flink.table.catalog.StartMode;
3234
import org.apache.flink.table.catalog.TableDistribution;
3335
import org.apache.flink.table.catalog.UnresolvedIdentifier;
3436
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
@@ -78,6 +80,8 @@ protected interface MergeContext {
7880
ResolvedSchema getMergedQuerySchema();
7981

8082
RefreshMode getMergedRefreshMode();
83+
84+
StartMode getMergedStartMode();
8185
}
8286

8387
protected abstract MergeContext getMergeContext(
@@ -99,6 +103,17 @@ protected final IntervalFreshness getDerivedFreshness(T sqlCreateMaterializedTab
99103
.orElse(null);
100104
}
101105

106+
protected final StartMode getStartMode(T sqlCreateMaterializedTable, ConvertContext context) {
107+
StartMode startMode =
108+
MaterializedTableUtils.getStartMode(sqlCreateMaterializedTable.getStartMode());
109+
if (startMode != null) {
110+
return startMode;
111+
}
112+
return StartMode.of(
113+
context.getTableConfig()
114+
.get(MaterializedTableConfigOptions.MATERIALIZED_TABLE_DEFAULT_START_MODE));
115+
}
116+
102117
protected final ResolvedSchema getQueryResolvedSchema(
103118
T sqlCreateMaterializedTable, ConvertContext context) {
104119
SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
@@ -160,6 +175,8 @@ protected final ResolvedCatalogMaterializedTable getResolvedCatalogMaterializedT
160175

161176
final RefreshMode refreshMode = getDerivedRefreshMode(logicalRefreshMode);
162177

178+
final StartMode startMode = mergeContext.getMergedStartMode();
179+
163180
return context.getCatalogManager()
164181
.resolveCatalogMaterializedTable(
165182
CatalogMaterializedTable.newBuilder()
@@ -174,6 +191,7 @@ protected final ResolvedCatalogMaterializedTable getResolvedCatalogMaterializedT
174191
.logicalRefreshMode(logicalRefreshMode)
175192
.refreshMode(refreshMode)
176193
.refreshStatus(RefreshStatus.INITIALIZING)
194+
.startMode(startMode)
177195
.build());
178196
}
179197

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
3030
import org.apache.flink.table.catalog.ResolvedSchema;
3131
import org.apache.flink.table.catalog.SchemaResolver;
32+
import org.apache.flink.table.catalog.StartMode;
3233
import org.apache.flink.table.catalog.TableChange;
3334
import org.apache.flink.table.catalog.TableDistribution;
3435
import org.apache.flink.table.catalog.UniqueConstraint;
@@ -137,6 +138,15 @@ private Function<ResolvedCatalogMaterializedTable, List<TableChange>> buildTable
137138
throw new ValidationException("Changing of REFRESH MODE is unsupported");
138139
}
139140

141+
final StartMode newStartMode = mergeContext.getMergedStartMode();
142+
final StartMode oldStartMode =
143+
oldTable.getStartMode()
144+
.orElseThrow(
145+
() -> new ValidationException("START_MODE must not be null"));
146+
if (!Objects.equals(oldStartMode, newStartMode)) {
147+
changes.add(TableChange.modifyStartMode(newStartMode));
148+
}
149+
140150
return changes;
141151
};
142152
}
@@ -213,7 +223,7 @@ private List<TableChange> getSchemaTableChanges(
213223
private Optional<TableChange> getConstraintChange(
214224
final ResolvedSchema oldSchema,
215225
final ResolvedSchema newSchema,
216-
boolean hasConstraintDefinition) {
226+
final boolean hasConstraintDefinition) {
217227
final UniqueConstraint oldConstraint = oldSchema.getPrimaryKey().orElse(null);
218228
final UniqueConstraint newConstraint = newSchema.getPrimaryKey().orElse(null);
219229
if (hasConstraintDefinition && !Objects.equals(oldConstraint, newConstraint)) {
@@ -279,11 +289,8 @@ public boolean hasSchemaDefinition() {
279289

280290
@Override
281291
public boolean hasConstraintDefinition() {
282-
if (!sqlCreateMaterializedTable.getTableConstraints().isEmpty()) {
283-
return true;
284-
}
285-
286-
return hasSchemaDefinition();
292+
return !sqlCreateMaterializedTable.getTableConstraints().isEmpty()
293+
|| hasSchemaDefinition();
287294
}
288295

289296
@Override
@@ -354,6 +361,11 @@ public RefreshMode getMergedRefreshMode() {
354361
return getDerivedRefreshMode(
355362
getDerivedLogicalRefreshMode(sqlCreateMaterializedTable));
356363
}
364+
365+
@Override
366+
public StartMode getMergedStartMode() {
367+
return getStartMode(sqlCreateMaterializedTable, context);
368+
}
357369
};
358370
}
359371
}

0 commit comments

Comments
 (0)