Skip to content

Commit 85e3cf9

Browse files
author
litiliu
committed
Support disabling automatic table creation and fail when target table does not exist
1 parent 343778f commit 85e3cf9

16 files changed

Lines changed: 812 additions & 29 deletions

File tree

docs/content.zh/docs/connectors/pipeline-connectors/fluss.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,13 @@ Pipeline Connector Options
9999
<td>String</td>
100100
<td>用于建立与 Fluss 集群初始连接的主机/端口对列表。 </td>
101101
</tr>
102+
<tr>
103+
<td>schema.create.mode</td>
104+
<td>optional</td>
105+
<td style="word-wrap: break-word;">CREATE_IF_NOT_EXISTS</td>
106+
<td>Enum</td>
107+
<td>目标端建表行为。使用 <code>CREATE_IF_NOT_EXISTS</code> 自动创建缺失的 Fluss 表;使用 <code>ERROR_IF_NOT_EXISTS</code> 要求目标 Fluss 表必须提前存在。</td>
108+
</tr>
102109
<tr>
103110
<td>bucket.key</td>
104111
<td>optional</td>
@@ -138,6 +145,7 @@ Pipeline Connector Options
138145
* 支持 Fluss 主键表和日志表。
139146
140147
* 关于自动建表
148+
* 默认启用自动建表。如需要求目标表提前创建,可将 `schema.create.mode` 设置为 `ERROR_IF_NOT_EXISTS`。
141149
* 没有分区键
142150
* 桶数量由 `bucket.num` 选项控制
143151
* 数据分布由 `bucket.key` 选项控制。对于主键表,若未指定分桶键,则分桶键默认为主键(不含分区键);对于无主键的日志表,若未指定分桶键,则数据将随机分配到各个桶中。

docs/content/docs/connectors/pipeline-connectors/fluss.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,13 @@ Pipeline Connector Options
100100
<td>String</td>
101101
<td>The bootstrap servers for the Fluss sink connection. </td>
102102
</tr>
103+
<tr>
104+
<td>schema.create.mode</td>
105+
<td>optional</td>
106+
<td style="word-wrap: break-word;">CREATE_IF_NOT_EXISTS</td>
107+
<td>Enum</td>
108+
<td>Behavior for handling target schema creation. Use <code>CREATE_IF_NOT_EXISTS</code> to automatically create missing Fluss tables, or <code>ERROR_IF_NOT_EXISTS</code> to require target Fluss tables to already exist.</td>
109+
</tr>
103110
<tr>
104111
<td>bucket.key</td>
105112
<td>optional</td>
@@ -140,6 +147,7 @@ Pipeline Connector Options
140147
* Support Fluss primary key table and log table.
141148
142149
* For creating table automatically
150+
* Automatic table creation is enabled by default. To require pre-created target tables, set `schema.create.mode` to `ERROR_IF_NOT_EXISTS`.
143151
* There is no partition key
144152
* The number of buckets is controlled by `bucket.num`
145153
* The distribution keys are controlled by option `bucket.key`. For primary key table and a bucket key is not specified, the bucket key will be used as primary key(excluding the partition key). For log table has no primary key and the bucket key is not specified, the data will be distributed to each bucket randomly.

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2222
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
2323
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
24+
import org.apache.flink.cdc.common.pipeline.SchemaCreateMode;
2425
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
2526
import org.apache.flink.cdc.common.utils.Preconditions;
2627
import org.apache.flink.cdc.common.utils.StringUtils;
@@ -53,6 +54,7 @@
5354
import java.util.stream.Stream;
5455

5556
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR;
57+
import static org.apache.flink.cdc.common.sink.SinkOptions.SCHEMA_CREATE_MODE;
5658
import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
5759
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
5860

@@ -264,6 +266,9 @@ private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBe
264266
Map<String, String> sinkMap =
265267
mapper.convertValue(sinkNode, new TypeReference<Map<String, String>>() {});
266268

269+
SchemaCreateMode schemaCreateMode = Configuration.fromMap(sinkMap).get(SCHEMA_CREATE_MODE);
270+
sinkMap.remove(SCHEMA_CREATE_MODE.key());
271+
267272
// "type" field is required
268273
String type =
269274
checkNotNull(
@@ -274,7 +279,8 @@ private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBe
274279
// "name" field is optional
275280
String name = sinkMap.remove(NAME_KEY);
276281

277-
return new SinkDef(type, name, Configuration.fromMap(sinkMap), declaredSETypes);
282+
return new SinkDef(
283+
type, name, Configuration.fromMap(sinkMap), declaredSETypes, schemaCreateMode);
278284
}
279285

280286
private RouteDef toRouteDef(JsonNode routeNode) {

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.cdc.common.configuration.Configuration;
2121
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2222
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
23+
import org.apache.flink.cdc.common.pipeline.SchemaCreateMode;
2324
import org.apache.flink.cdc.composer.definition.ModelDef;
2425
import org.apache.flink.cdc.composer.definition.PipelineDef;
2526
import org.apache.flink.cdc.composer.definition.RouteDef;
@@ -327,6 +328,52 @@ void testCreateTableAutoAddedToIncludedSchemaChanges() throws Exception {
327328
"lenient", "[add.column]", null, ImmutableSet.of(ADD_COLUMN, CREATE_TABLE));
328329
}
329330

331+
@Test
332+
void testSchemaCreateModeParsing() throws Exception {
333+
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
334+
335+
PipelineDef defaultPipelineDef =
336+
parser.parse(
337+
"source:\n"
338+
+ " type: foo\n"
339+
+ "sink:\n"
340+
+ " type: bar\n"
341+
+ " bootstrap.servers: localhost:9123\n",
342+
new Configuration());
343+
assertThat(defaultPipelineDef.getSink().getSchemaCreateMode())
344+
.isEqualTo(SchemaCreateMode.CREATE_IF_NOT_EXISTS);
345+
assertThat(defaultPipelineDef.getSink().getConfig().toMap())
346+
.containsEntry("bootstrap.servers", "localhost:9123")
347+
.doesNotContainKey("schema.create.mode");
348+
349+
PipelineDef errorIfNotExistsPipelineDef =
350+
parser.parse(
351+
"source:\n"
352+
+ " type: foo\n"
353+
+ "sink:\n"
354+
+ " type: bar\n"
355+
+ " schema.create.mode: ERROR_IF_NOT_EXISTS\n"
356+
+ " bootstrap.servers: localhost:9123\n",
357+
new Configuration());
358+
assertThat(errorIfNotExistsPipelineDef.getSink().getSchemaCreateMode())
359+
.isEqualTo(SchemaCreateMode.ERROR_IF_NOT_EXISTS);
360+
assertThat(errorIfNotExistsPipelineDef.getSink().getConfig().toMap())
361+
.containsEntry("bootstrap.servers", "localhost:9123")
362+
.doesNotContainKey("schema.create.mode");
363+
364+
assertThatThrownBy(
365+
() ->
366+
parser.parse(
367+
"source:\n"
368+
+ " type: foo\n"
369+
+ "sink:\n"
370+
+ " type: bar\n"
371+
+ " schema.create.mode: INVALID\n",
372+
new Configuration()))
373+
.isInstanceOf(IllegalArgumentException.class)
374+
.hasMessageContaining("schema.create.mode");
375+
}
376+
330377
private void testSchemaEvolutionTypesParsing(
331378
String behavior, String included, String excluded, Set<SchemaChangeEventType> expected)
332379
throws Exception {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.pipeline;
19+
20+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
22+
/** Behavior for handling target schema creation when receiving a create table event. */
23+
@PublicEvolving
24+
public enum SchemaCreateMode {
25+
/** Create the target schema if it does not exist. */
26+
CREATE_IF_NOT_EXISTS,
27+
28+
/** Fail if the target schema does not exist. */
29+
ERROR_IF_NOT_EXISTS
30+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.sink;
19+
20+
import org.apache.flink.cdc.common.annotation.Internal;
21+
import org.apache.flink.cdc.common.event.CreateTableEvent;
22+
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
23+
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
24+
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
25+
import org.apache.flink.cdc.common.pipeline.SchemaCreateMode;
26+
import org.apache.flink.table.api.ValidationException;
27+
28+
import java.util.Set;
29+
30+
/** A {@link MetadataApplier} wrapper that applies the configured schema create mode. */
31+
@Internal
32+
public class SchemaCreateModeMetadataApplier implements MetadataApplier {
33+
34+
private final MetadataApplier delegate;
35+
private final SchemaCreateMode schemaCreateMode;
36+
37+
private SchemaCreateModeMetadataApplier(
38+
MetadataApplier delegate, SchemaCreateMode schemaCreateMode) {
39+
this.delegate = delegate;
40+
this.schemaCreateMode = schemaCreateMode;
41+
}
42+
43+
public static MetadataApplier wrap(
44+
MetadataApplier delegate, SchemaCreateMode schemaCreateMode, String sinkType) {
45+
if (schemaCreateMode == SchemaCreateMode.CREATE_IF_NOT_EXISTS) {
46+
return delegate;
47+
}
48+
if (!(delegate instanceof SupportsExistingSchemaValidation)) {
49+
throw new ValidationException(
50+
String.format(
51+
"Sink '%s' does not support '%s=%s'. The sink metadata applier must implement %s.",
52+
sinkType,
53+
SinkOptions.SCHEMA_CREATE_MODE.key(),
54+
schemaCreateMode,
55+
SupportsExistingSchemaValidation.class.getSimpleName()));
56+
}
57+
return new SchemaCreateModeMetadataApplier(delegate, schemaCreateMode);
58+
}
59+
60+
@Override
61+
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
62+
throws SchemaEvolveException {
63+
if (schemaCreateMode == SchemaCreateMode.ERROR_IF_NOT_EXISTS
64+
&& schemaChangeEvent instanceof CreateTableEvent) {
65+
CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent;
66+
SupportsExistingSchemaValidation validator =
67+
(SupportsExistingSchemaValidation) delegate;
68+
if (!validator.schemaExists(createTableEvent.tableId())) {
69+
throw new ValidationException(
70+
"Target schema does not exist: " + createTableEvent.tableId());
71+
}
72+
validator.validateExistingSchema(createTableEvent);
73+
return;
74+
}
75+
delegate.applySchemaChange(schemaChangeEvent);
76+
}
77+
78+
@Override
79+
public MetadataApplier setAcceptedSchemaEvolutionTypes(
80+
Set<SchemaChangeEventType> schemaEvolutionTypes) {
81+
delegate.setAcceptedSchemaEvolutionTypes(schemaEvolutionTypes);
82+
return this;
83+
}
84+
85+
@Override
86+
public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) {
87+
return delegate.acceptsSchemaEvolutionType(schemaChangeEventType);
88+
}
89+
90+
@Override
91+
public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
92+
return delegate.getSupportedSchemaEvolutionTypes();
93+
}
94+
95+
@Override
96+
public void close() throws Exception {
97+
delegate.close();
98+
}
99+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.sink;
19+
20+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
import org.apache.flink.cdc.common.configuration.ConfigOption;
22+
import org.apache.flink.cdc.common.configuration.ConfigOptions;
23+
import org.apache.flink.cdc.common.pipeline.SchemaCreateMode;
24+
25+
/** Common options for pipeline sinks. */
26+
@PublicEvolving
27+
public class SinkOptions {
28+
29+
public static final ConfigOption<SchemaCreateMode> SCHEMA_CREATE_MODE =
30+
ConfigOptions.key("schema.create.mode")
31+
.enumType(SchemaCreateMode.class)
32+
.defaultValue(SchemaCreateMode.CREATE_IF_NOT_EXISTS)
33+
.withDescription(
34+
"Behavior for handling target schema creation when receiving a create table event.");
35+
36+
private SinkOptions() {}
37+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.sink;
19+
20+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
import org.apache.flink.cdc.common.event.CreateTableEvent;
22+
import org.apache.flink.cdc.common.event.TableId;
23+
24+
/** Capability for sinks that can validate existing target schemas. */
25+
@PublicEvolving
26+
public interface SupportsExistingSchemaValidation {
27+
28+
/** Returns whether the target schema exists. */
29+
boolean schemaExists(TableId tableId);
30+
31+
/** Validates that the existing target schema is compatible with the create table event. */
32+
void validateExistingSchema(CreateTableEvent event);
33+
}

0 commit comments

Comments
 (0)