Skip to content

Commit 259496b

Browse files
author
litiliu
committed
Support disabling automatic table creation and fail when target table does not exist
1 parent 343778f commit 259496b

18 files changed

Lines changed: 817 additions & 29 deletions

File tree

docs/content.zh/docs/connectors/pipeline-connectors/fluss.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,13 @@ Pipeline Connector Options
9999
<td>String</td>
100100
<td>用于建立与 Fluss 集群初始连接的主机/端口对列表。 </td>
101101
</tr>
102+
<tr>
103+
<td>target.table.create.mode</td>
104+
<td>optional</td>
105+
<td style="word-wrap: break-word;">CREATE_IF_NOT_EXISTS</td>
106+
<td>Enum</td>
107+
<td>目标端建表行为。使用 <code>CREATE_IF_NOT_EXISTS</code> 自动创建缺失的 Fluss 表;使用 <code>ERROR_IF_NOT_EXISTS</code> 要求目标 Fluss 表必须提前存在,并校验 schema。</td>
108+
</tr>
102109
<tr>
103110
<td>bucket.key</td>
104111
<td>optional</td>
@@ -138,6 +145,8 @@ Pipeline Connector Options
138145
* 支持 Fluss 主键表和日志表。
139146
140147
* 关于自动建表
148+
* 默认通过 `target.table.create.mode: CREATE_IF_NOT_EXISTS` 启用自动建表。
149+
* 如需在 Flink CDC 外部提前创建目标表,可将 `target.table.create.mode` 设置为 `ERROR_IF_NOT_EXISTS`。该模式下目标表必须已存在,且 schema 必须与源表匹配。
141150
* 没有分区键
142151
* 桶数量由 `bucket.num` 选项控制
143152
* 数据分布由 `bucket.key` 选项控制。对于主键表,若未指定分桶键,则分桶键默认为主键(不含分区键);对于无主键的日志表,若未指定分桶键,则数据将随机分配到各个桶中。

docs/content.zh/docs/connectors/pipeline-connectors/maxcompute.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,13 @@ pipeline:
9393
<td>String</td>
9494
<td>Sink 的名称.</td>
9595
</tr>
96+
<tr>
97+
<td>target.table.create.mode</td>
98+
<td>optional</td>
99+
<td style="word-wrap: break-word;">CREATE_IF_NOT_EXISTS</td>
100+
<td>Enum</td>
101+
<td>目标端建表行为。使用 <code>CREATE_IF_NOT_EXISTS</code> 自动创建缺失的 MaxCompute 表;使用 <code>ERROR_IF_NOT_EXISTS</code> 要求目标 MaxCompute 表必须提前存在,并校验 schema 和主键。</td>
102+
</tr>
96103
<tr>
97104
<td>access-id</td>
98105
<td>required</td>
@@ -198,6 +205,8 @@ pipeline:
198205
199206
* 链接器 支持自动建表,将MaxCompute表与源表的位置关系、数据类型进行自动映射(参见下文映射表),当源表有主键时,自动创建
200207
MaxCompute Delta 表,否则创建普通 MaxCompute 表(Append表)
208+
* 默认通过 `target.table.create.mode: CREATE_IF_NOT_EXISTS` 启用自动建表。
209+
* 如需在 Flink CDC 外部提前创建目标表,可将 `target.table.create.mode` 设置为 `ERROR_IF_NOT_EXISTS`。该模式下目标表必须已存在,且 schema 和主键必须与源表匹配。
201210
* 当写入普通 MaxCompute 表(Append表)时,会忽略`delete`操作,`update`操作会被视为`insert`操作
202211
* 目前仅支持 at-least-once,Delta 表由于主键特性能够实现幂等写
203212
* 对于表结构变更同步

docs/content/docs/connectors/pipeline-connectors/fluss.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,13 @@ Pipeline Connector Options
100100
<td>String</td>
101101
<td>The bootstrap servers for the Fluss sink connection. </td>
102102
</tr>
103+
<tr>
104+
<td>target.table.create.mode</td>
105+
<td>optional</td>
106+
<td style="word-wrap: break-word;">CREATE_IF_NOT_EXISTS</td>
107+
<td>Enum</td>
108+
<td>Behavior for handling target table creation. Use <code>CREATE_IF_NOT_EXISTS</code> to automatically create missing Fluss tables, or <code>ERROR_IF_NOT_EXISTS</code> to require target Fluss tables to already exist and validate their schema.</td>
109+
</tr>
103110
<tr>
104111
<td>bucket.key</td>
105112
<td>optional</td>
@@ -140,6 +147,8 @@ Pipeline Connector Options
140147
* Support Fluss primary key table and log table.
141148
142149
* For creating table automatically
150+
* Automatic table creation is enabled by default with `target.table.create.mode: CREATE_IF_NOT_EXISTS`.
151+
* To require target tables to be created outside Flink CDC, set `target.table.create.mode` to `ERROR_IF_NOT_EXISTS`. In this mode, the target table must already exist, and its schema must match the source table.
143152
* There is no partition key
144153
* The number of buckets is controlled by `bucket.num`
145154
* The distribution keys are controlled by option `bucket.key`. For primary key table and a bucket key is not specified, the bucket key will be used as primary key(excluding the partition key). For log table has no primary key and the bucket key is not specified, the data will be distributed to each bucket randomly.

docs/content/docs/connectors/pipeline-connectors/maxcompute.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,13 @@ pipeline:
9393
<td>String</td>
9494
<td>The name of the sink.</td>
9595
</tr>
96+
<tr>
97+
<td>target.table.create.mode</td>
98+
<td>optional</td>
99+
<td style="word-wrap: break-word;">CREATE_IF_NOT_EXISTS</td>
100+
<td>Enum</td>
101+
<td>Behavior for handling target table creation. Use <code>CREATE_IF_NOT_EXISTS</code> to automatically create missing MaxCompute tables, or <code>ERROR_IF_NOT_EXISTS</code> to require target MaxCompute tables to already exist and validate their schema and primary key.</td>
102+
</tr>
96103
<tr>
97104
<td>access-id</td>
98105
<td>required</td>
@@ -193,6 +200,8 @@ pipeline:
193200
## Usage Instructions
194201
195202
* The connector supports automatic table creation, automatically mapping the location relationship and data types between MaxCompute tables and source tables (see the mapping table below). When the source table has a primary key, a MaxCompute Delta table is automatically created; otherwise, a regular MaxCompute table (Append table) is created.
203+
* Automatic table creation is enabled by default with `target.table.create.mode: CREATE_IF_NOT_EXISTS`.
204+
* To require target tables to be created outside Flink CDC, set `target.table.create.mode` to `ERROR_IF_NOT_EXISTS`. In this mode, the target table must already exist, and its schema and primary key must match the source table.
196205
* When writing to a regular MaxCompute table (Append table), the delete operation will be ignored, and the update operation will be treated as an insert operation.
197206
* Currently, only at-least-once is supported. Delta tables can achieve idempotent writes due to their primary key characteristics.
198207
* For synchronization of table structure changes:

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2222
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
2323
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
24+
import org.apache.flink.cdc.common.pipeline.TargetTableCreateMode;
2425
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
2526
import org.apache.flink.cdc.common.utils.Preconditions;
2627
import org.apache.flink.cdc.common.utils.StringUtils;
@@ -53,6 +54,7 @@
5354
import java.util.stream.Stream;
5455

5556
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR;
57+
import static org.apache.flink.cdc.common.sink.SinkOptions.TARGET_TABLE_CREATE_MODE;
5658
import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
5759
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
5860

@@ -264,6 +266,10 @@ private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBe
264266
Map<String, String> sinkMap =
265267
mapper.convertValue(sinkNode, new TypeReference<Map<String, String>>() {});
266268

269+
TargetTableCreateMode targetTableCreateMode =
270+
Configuration.fromMap(sinkMap).get(TARGET_TABLE_CREATE_MODE);
271+
sinkMap.remove(TARGET_TABLE_CREATE_MODE.key());
272+
267273
// "type" field is required
268274
String type =
269275
checkNotNull(
@@ -274,7 +280,8 @@ private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBe
274280
// "name" field is optional
275281
String name = sinkMap.remove(NAME_KEY);
276282

277-
return new SinkDef(type, name, Configuration.fromMap(sinkMap), declaredSETypes);
283+
return new SinkDef(
284+
type, name, Configuration.fromMap(sinkMap), declaredSETypes, targetTableCreateMode);
278285
}
279286

280287
private RouteDef toRouteDef(JsonNode routeNode) {

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.cdc.common.configuration.Configuration;
2121
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2222
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
23+
import org.apache.flink.cdc.common.pipeline.TargetTableCreateMode;
2324
import org.apache.flink.cdc.composer.definition.ModelDef;
2425
import org.apache.flink.cdc.composer.definition.PipelineDef;
2526
import org.apache.flink.cdc.composer.definition.RouteDef;
@@ -327,6 +328,52 @@ void testCreateTableAutoAddedToIncludedSchemaChanges() throws Exception {
327328
"lenient", "[add.column]", null, ImmutableSet.of(ADD_COLUMN, CREATE_TABLE));
328329
}
329330

331+
@Test
332+
void testTargetTableCreateModeParsing() throws Exception {
333+
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
334+
335+
PipelineDef defaultPipelineDef =
336+
parser.parse(
337+
"source:\n"
338+
+ " type: foo\n"
339+
+ "sink:\n"
340+
+ " type: bar\n"
341+
+ " bootstrap.servers: localhost:9123\n",
342+
new Configuration());
343+
assertThat(defaultPipelineDef.getSink().getTargetTableCreateMode())
344+
.isEqualTo(TargetTableCreateMode.CREATE_IF_NOT_EXISTS);
345+
assertThat(defaultPipelineDef.getSink().getConfig().toMap())
346+
.containsEntry("bootstrap.servers", "localhost:9123")
347+
.doesNotContainKey("target.table.create.mode");
348+
349+
PipelineDef errorIfNotExistsPipelineDef =
350+
parser.parse(
351+
"source:\n"
352+
+ " type: foo\n"
353+
+ "sink:\n"
354+
+ " type: bar\n"
355+
+ " target.table.create.mode: ERROR_IF_NOT_EXISTS\n"
356+
+ " bootstrap.servers: localhost:9123\n",
357+
new Configuration());
358+
assertThat(errorIfNotExistsPipelineDef.getSink().getTargetTableCreateMode())
359+
.isEqualTo(TargetTableCreateMode.ERROR_IF_NOT_EXISTS);
360+
assertThat(errorIfNotExistsPipelineDef.getSink().getConfig().toMap())
361+
.containsEntry("bootstrap.servers", "localhost:9123")
362+
.doesNotContainKey("target.table.create.mode");
363+
364+
assertThatThrownBy(
365+
() ->
366+
parser.parse(
367+
"source:\n"
368+
+ " type: foo\n"
369+
+ "sink:\n"
370+
+ " type: bar\n"
371+
+ " target.table.create.mode: INVALID\n",
372+
new Configuration()))
373+
.isInstanceOf(IllegalArgumentException.class)
374+
.hasMessageContaining("target.table.create.mode");
375+
}
376+
330377
private void testSchemaEvolutionTypesParsing(
331378
String behavior, String included, String excluded, Set<SchemaChangeEventType> expected)
332379
throws Exception {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.pipeline;
19+
20+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
22+
/** Behavior for handling target table creation when receiving a create table event. */
23+
@PublicEvolving
24+
public enum TargetTableCreateMode {
25+
/** Create the target table if it does not exist. */
26+
CREATE_IF_NOT_EXISTS,
27+
28+
/** Fail if the target table does not exist. */
29+
ERROR_IF_NOT_EXISTS
30+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.sink;
19+
20+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
import org.apache.flink.cdc.common.configuration.ConfigOption;
22+
import org.apache.flink.cdc.common.configuration.ConfigOptions;
23+
import org.apache.flink.cdc.common.pipeline.TargetTableCreateMode;
24+
25+
/** Common options for pipeline sinks. */
26+
@PublicEvolving
27+
public class SinkOptions {
28+
29+
public static final ConfigOption<TargetTableCreateMode> TARGET_TABLE_CREATE_MODE =
30+
ConfigOptions.key("target.table.create.mode")
31+
.enumType(TargetTableCreateMode.class)
32+
.defaultValue(TargetTableCreateMode.CREATE_IF_NOT_EXISTS)
33+
.withDescription(
34+
"Behavior for handling target table creation when receiving a create table event.");
35+
36+
private SinkOptions() {}
37+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.sink;
19+
20+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
import org.apache.flink.cdc.common.event.TableId;
22+
23+
/** Capability for sinks that can check whether a target table already exists. */
24+
@PublicEvolving
25+
public interface SupportsTargetTableExistenceCheck {
26+
27+
/** Returns whether the target table exists. */
28+
boolean targetTableExists(TableId tableId);
29+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.sink;
19+
20+
import org.apache.flink.cdc.common.annotation.Internal;
21+
import org.apache.flink.cdc.common.event.CreateTableEvent;
22+
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
23+
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
24+
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
25+
import org.apache.flink.cdc.common.pipeline.TargetTableCreateMode;
26+
import org.apache.flink.table.api.ValidationException;
27+
28+
import java.util.Set;
29+
30+
/** A {@link MetadataApplier} wrapper that applies the configured target table create mode. */
31+
@Internal
32+
public class TargetTableCreateModeMetadataApplier implements MetadataApplier {
33+
34+
private final MetadataApplier delegate;
35+
private final TargetTableCreateMode targetTableCreateMode;
36+
37+
private TargetTableCreateModeMetadataApplier(
38+
MetadataApplier delegate, TargetTableCreateMode targetTableCreateMode) {
39+
this.delegate = delegate;
40+
this.targetTableCreateMode = targetTableCreateMode;
41+
}
42+
43+
public static MetadataApplier wrap(
44+
MetadataApplier delegate, TargetTableCreateMode targetTableCreateMode, String sinkType) {
45+
if (targetTableCreateMode == TargetTableCreateMode.CREATE_IF_NOT_EXISTS) {
46+
return delegate;
47+
}
48+
if (!(delegate instanceof SupportsTargetTableExistenceCheck)) {
49+
throw new ValidationException(
50+
String.format(
51+
"Sink '%s' does not support '%s=%s'. The sink metadata applier must implement %s.",
52+
sinkType,
53+
SinkOptions.TARGET_TABLE_CREATE_MODE.key(),
54+
targetTableCreateMode,
55+
SupportsTargetTableExistenceCheck.class.getSimpleName()));
56+
}
57+
return new TargetTableCreateModeMetadataApplier(delegate, targetTableCreateMode);
58+
}
59+
60+
@Override
61+
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
62+
throws SchemaEvolveException {
63+
if (targetTableCreateMode == TargetTableCreateMode.ERROR_IF_NOT_EXISTS
64+
&& schemaChangeEvent instanceof CreateTableEvent) {
65+
CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent;
66+
SupportsTargetTableExistenceCheck existenceCheck =
67+
(SupportsTargetTableExistenceCheck) delegate;
68+
if (!existenceCheck.targetTableExists(createTableEvent.tableId())) {
69+
throw new ValidationException(
70+
"Target table does not exist: " + createTableEvent.tableId());
71+
}
72+
}
73+
delegate.applySchemaChange(schemaChangeEvent);
74+
}
75+
76+
@Override
77+
public MetadataApplier setAcceptedSchemaEvolutionTypes(
78+
Set<SchemaChangeEventType> schemaEvolutionTypes) {
79+
delegate.setAcceptedSchemaEvolutionTypes(schemaEvolutionTypes);
80+
return this;
81+
}
82+
83+
@Override
84+
public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) {
85+
return delegate.acceptsSchemaEvolutionType(schemaChangeEventType);
86+
}
87+
88+
@Override
89+
public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
90+
return delegate.getSupportedSchemaEvolutionTypes();
91+
}
92+
93+
@Override
94+
public void close() throws Exception {
95+
delegate.close();
96+
}
97+
}

0 commit comments

Comments
 (0)