Skip to content

Commit 56f5a77

Browse files
author
litiliu
committed
Support disabling automatic table creation and fail when target table does not exist
1 parent 343778f commit 56f5a77

8 files changed

Lines changed: 99 additions & 3 deletions

File tree

docs/content.zh/docs/connectors/pipeline-connectors/fluss.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,13 @@ Pipeline Connector Options
9999
<td>String</td>
100100
<td>用于建立与 Fluss 集群初始连接的主机/端口对列表。 </td>
101101
</tr>
102+
<tr>
103+
<td>auto.create.table.enabled</td>
104+
<td>optional</td>
105+
<td style="word-wrap: break-word;">true</td>
106+
<td>Boolean</td>
107+
<td>当目标表不存在时,是否自动创建 Fluss table。如果禁用该选项,Pipeline 启动前必须提前创建好目标 Fluss table,否则 Pipeline 会失败。</td>
108+
</tr>
102109
<tr>
103110
<td>bucket.key</td>
104111
<td>optional</td>
@@ -138,6 +145,7 @@ Pipeline Connector Options
138145
* 支持 Fluss 主键表和日志表。
139146
140147
* 关于自动建表
148+
* 默认启用自动建表。如需要求目标表提前创建,可将 `auto.create.table.enabled` 设置为 `false`。
141149
* 没有分区键
142150
* 桶数量由 `bucket.num` 选项控制
143151
* 数据分布由 `bucket.key` 选项控制。对于主键表,若未指定分桶键,则分桶键默认为主键(不含分区键);对于无主键的日志表,若未指定分桶键,则数据将随机分配到各个桶中。

docs/content/docs/connectors/pipeline-connectors/fluss.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,13 @@ Pipeline Connector Options
100100
<td>String</td>
101101
<td>The bootstrap servers for the Fluss sink connection. </td>
102102
</tr>
103+
<tr>
104+
<td>auto.create.table.enabled</td>
105+
<td>optional</td>
106+
<td style="word-wrap: break-word;">true</td>
107+
<td>Boolean</td>
108+
<td>Whether to automatically create the Fluss table when the target table does not exist. If disabled, the target Fluss table must already exist before the pipeline starts, otherwise the pipeline will fail.</td>
109+
</tr>
103110
<tr>
104111
<td>bucket.key</td>
105112
<td>optional</td>
@@ -140,6 +147,7 @@ Pipeline Connector Options
140147
* Support Fluss primary key table and log table.
141148
142149
* For creating table automatically
150+
* Automatic table creation is enabled by default. To require pre-created target tables, set `auto.create.table.enabled` to `false`.
143151
* There is no partition key
144152
* The number of buckets is controlled by `bucket.num`
145153
* The distribution keys are controlled by option `bucket.key`. For primary key table and a bucket key is not specified, the bucket key will be used as primary key(excluding the partition key). For log table has no primary key and the bucket key is not specified, the data will be distributed to each bucket randomly.

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Map;
3333
import java.util.Set;
3434

35+
import static org.apache.flink.cdc.connectors.fluss.sink.FlussDataSinkOptions.AUTO_CREATE_TABLE_ENABLED;
3536
import static org.apache.flink.cdc.connectors.fluss.sink.FlussDataSinkOptions.BOOTSTRAP_SERVERS;
3637
import static org.apache.flink.cdc.connectors.fluss.sink.FlussDataSinkOptions.BUCKET_KEY;
3738
import static org.apache.flink.cdc.connectors.fluss.sink.FlussDataSinkOptions.BUCKET_NUMBER;
@@ -57,7 +58,12 @@ public DataSink createDataSink(Context context) {
5758
parseBucketKeys(factoryConfiguration.get(BUCKET_KEY));
5859
Map<String, Integer> bucketNumMap =
5960
parseBucketNumber(factoryConfiguration.get(BUCKET_NUMBER));
60-
return new FlussDataSink(flussClientConfig, tableProperties, bucketKeysMap, bucketNumMap);
61+
return new FlussDataSink(
62+
flussClientConfig,
63+
tableProperties,
64+
bucketKeysMap,
65+
bucketNumMap,
66+
factoryConfiguration.get(AUTO_CREATE_TABLE_ENABLED));
6167
}
6268

6369
@Override
@@ -75,6 +81,7 @@ public Set<ConfigOption<?>> requiredOptions() {
7581
@Override
7682
public Set<ConfigOption<?>> optionalOptions() {
7783
Set<ConfigOption<?>> options = new HashSet<>();
84+
options.add(AUTO_CREATE_TABLE_ENABLED);
7885
options.add(BUCKET_KEY);
7986
options.add(BUCKET_NUMBER);
8087
return options;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,19 @@ public class FlussDataSink implements DataSink {
3737
private final Map<String, String> tableProperties;
3838
private final Map<String, List<String>> bucketKeysMap;
3939
private final Map<String, Integer> bucketNumMap;
40+
private final boolean autoCreateTableEnabled;
4041

4142
public FlussDataSink(
4243
Configuration flussClientConfig,
4344
Map<String, String> tableProperties,
4445
Map<String, List<String>> bucketKeysMap,
45-
Map<String, Integer> bucketNumMap) {
46+
Map<String, Integer> bucketNumMap,
47+
boolean autoCreateTableEnabled) {
4648
this.flussClientConfig = flussClientConfig;
4749
this.tableProperties = tableProperties;
4850
this.bucketKeysMap = bucketKeysMap;
4951
this.bucketNumMap = bucketNumMap;
52+
this.autoCreateTableEnabled = autoCreateTableEnabled;
5053
}
5154

5255
@Override
@@ -58,7 +61,11 @@ public EventSinkProvider getEventSinkProvider() {
5861
@Override
5962
public MetadataApplier getMetadataApplier() {
6063
return new FlussMetaDataApplier(
61-
flussClientConfig, tableProperties, bucketKeysMap, bucketNumMap);
64+
flussClientConfig,
65+
tableProperties,
66+
bucketKeysMap,
67+
bucketNumMap,
68+
autoCreateTableEnabled);
6269
}
6370

6471
@Override

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSinkOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ public class FlussDataSinkOptions {
3333
.noDefaultValue()
3434
.withDescription("The bootstrap servers for the Fluss sink connection.");
3535

36+
public static final ConfigOption<Boolean> AUTO_CREATE_TABLE_ENABLED =
37+
ConfigOptions.key("auto.create.table.enabled")
38+
.booleanType()
39+
.defaultValue(true)
40+
.withDescription(
41+
"Whether to automatically create the Fluss table when the target table does not exist.");
42+
3643
public static final ConfigOption<String> BUCKET_KEY =
3744
ConfigOptions.key("bucket.key")
3845
.stringType()

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
5252
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
53+
import static org.apache.flink.cdc.connectors.fluss.sink.FlussDataSinkOptions.AUTO_CREATE_TABLE_ENABLED;
5354
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussTable;
5455
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussType;
5556

@@ -60,6 +61,7 @@ public class FlussMetaDataApplier implements MetadataApplier {
6061
private final Map<String, String> tableProperties;
6162
private final Map<String, List<String>> bucketKeysMap;
6263
private final Map<String, Integer> bucketNumMap;
64+
private final boolean autoCreateTableEnabled;
6365
private Set<SchemaChangeEventType> enabledEventTypes =
6466
new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE));
6567

@@ -68,10 +70,25 @@ public FlussMetaDataApplier(
6870
Map<String, String> tableProperties,
6971
Map<String, List<String>> bucketKeysMap,
7072
Map<String, Integer> bucketNumMap) {
73+
this(
74+
flussClientConfig,
75+
tableProperties,
76+
bucketKeysMap,
77+
bucketNumMap,
78+
AUTO_CREATE_TABLE_ENABLED.defaultValue());
79+
}
80+
81+
public FlussMetaDataApplier(
82+
Configuration flussClientConfig,
83+
Map<String, String> tableProperties,
84+
Map<String, List<String>> bucketKeysMap,
85+
Map<String, Integer> bucketNumMap,
86+
boolean autoCreateTableEnabled) {
7187
this.flussClientConfig = flussClientConfig;
7288
this.tableProperties = tableProperties;
7389
this.bucketKeysMap = bucketKeysMap;
7490
this.bucketNumMap = bucketNumMap;
91+
this.autoCreateTableEnabled = autoCreateTableEnabled;
7592
}
7693

7794
@Override
@@ -122,18 +139,32 @@ private void applyCreateTable(CreateTableEvent event) {
122139
toFlussTable(event.getSchema(), bucketKeys, bucketNum, tableProperties);
123140
admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, true);
124141
if (!admin.tableExists(tablePath).get()) {
142+
if (!autoCreateTableEnabled) {
143+
throwMissingTableException(tableIdentifier);
144+
}
125145
admin.createTable(tablePath, inferredFlussTable, false).get();
126146
} else {
127147
TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
128148
// sanity check to prevent unexpected table schema evolution.
129149
sanityCheck(inferredFlussTable, currentTableInfo);
130150
}
151+
} catch (ValidationException e) {
152+
throw e;
131153
} catch (Exception e) {
132154
LOG.error("Failed to apply schema change {}", event, e);
133155
throw new RuntimeException(e);
134156
}
135157
}
136158

159+
private void throwMissingTableException(String tableIdentifier) {
160+
throw new ValidationException(
161+
"Target Fluss table "
162+
+ tableIdentifier
163+
+ " does not exist and "
164+
+ AUTO_CREATE_TABLE_ENABLED.key()
165+
+ " is false.");
166+
}
167+
137168
private void applyDropTable(DropTableEvent event) {
138169
try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
139170
Admin admin = connection.getAdmin()) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactoryTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ private Configuration createValidConfiguration() {
147147
return Configuration.fromMap(
148148
ImmutableMap.<String, String>builder()
149149
.put(FlussDataSinkOptions.BOOTSTRAP_SERVERS.key(), "localhost:9123")
150+
.put(FlussDataSinkOptions.AUTO_CREATE_TABLE_ENABLED.key(), "false")
150151
.put(FlussDataSinkOptions.BUCKET_KEY.key(), "database1.table1:a,b")
151152
.put(FlussDataSinkOptions.BUCKET_NUMBER.key(), "database1.table1:2")
152153
.build());

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.cdc.common.schema.Schema;
2626
import org.apache.flink.cdc.common.types.DataTypes;
2727
import org.apache.flink.cdc.common.types.IntType;
28+
import org.apache.flink.table.api.ValidationException;
2829

2930
import org.apache.fluss.client.Connection;
3031
import org.apache.fluss.client.ConnectionFactory;
@@ -605,6 +606,32 @@ void testRecreateTableWithDifferentSchema() throws Exception {
605606
}
606607
}
607608

609+
@Test
610+
void testCreateTableWithAutoCreateTableDisabled() throws Exception {
611+
TableId tableId = TableId.tableId("default_namespace", DATABASE_NAME, "table1");
612+
TablePath tablePath = new TablePath(DATABASE_NAME, "table1");
613+
Schema schema =
614+
Schema.newBuilder()
615+
.physicalColumn("id", DataTypes.INT())
616+
.physicalColumn("name", DataTypes.STRING())
617+
.build();
618+
CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
619+
620+
try (FlussMetaDataApplier applier =
621+
new FlussMetaDataApplier(
622+
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
623+
Collections.emptyMap(),
624+
Collections.emptyMap(),
625+
Collections.emptyMap(),
626+
false)) {
627+
assertThatThrownBy(() -> applier.applySchemaChange(createTableEvent))
628+
.isExactlyInstanceOf(ValidationException.class)
629+
.hasMessageContaining("auto.create.table.enabled is false");
630+
assertThat(admin.databaseExists(DATABASE_NAME).get()).isTrue();
631+
assertThat(admin.tableExists(tablePath).get()).isFalse();
632+
}
633+
}
634+
608635
@Test
609636
void testCreateTableWithTableOptions() throws Exception {
610637
Schema schema =

0 commit comments

Comments
 (0)