Skip to content

Commit 5c09e0b

Browse files
authored
[Fix-4486] Fix physical deletion events not being synchronized in PostgreSQL (#4525)
1 parent b7a242c commit 5c09e0b

3 files changed

Lines changed: 42 additions & 4 deletions

File tree

dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,16 @@
2828
import org.apache.commons.collections.CollectionUtils;
2929
import org.apache.commons.lang3.StringUtils;
3030
import org.apache.flink.streaming.api.datastream.DataStream;
31+
import org.apache.flink.table.api.Schema;
32+
import org.apache.flink.table.catalog.Column;
33+
import org.apache.flink.table.catalog.ResolvedSchema;
34+
import org.apache.flink.table.catalog.UniqueConstraint;
3135
import org.apache.flink.table.operations.Operation;
36+
import org.apache.flink.table.types.AtomicDataType;
3237
import org.apache.flink.types.Row;
3338

3439
import java.io.Serializable;
40+
import java.util.ArrayList;
3541
import java.util.List;
3642

3743
public class SQLSinkBuilder extends AbstractSqlSinkBuilder implements Serializable {
@@ -48,9 +54,19 @@ private SQLSinkBuilder(FlinkCDCConfig config) {
4854
private FlinkTableObjectIdentifier addSourceTableView(DataStream<Row> rowDataDataStream, Table table) {
4955
// Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_
5056
String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline());
51-
57+
final ResolvedSchema resolvedSchema =
58+
customTableEnvironment.fromChangelogStream(rowDataDataStream).getResolvedSchema();
59+
List<Column> columns = new ArrayList<>();
60+
for (Column column : resolvedSchema.getColumns()) {
61+
columns.add(column.copy(new AtomicDataType(
62+
column.getDataType().getLogicalType().copy(false),
63+
column.getDataType().getConversionClass())));
64+
}
65+
final UniqueConstraint primaryKey = UniqueConstraint.primaryKey(viewName + "_pk", table.getPrimaryKeys());
66+
final ResolvedSchema sinkSchema = new ResolvedSchema(columns, resolvedSchema.getWatermarkSpecs(), primaryKey);
67+
final Schema schema = Schema.newBuilder().fromResolvedSchema(sinkSchema).build();
5268
customTableEnvironment.createTemporaryView(
53-
viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream));
69+
viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream, schema));
5470
logger.info("Create {} temporaryView successful...", viewName);
5571
return FlinkTableObjectIdentifier.of(viewName);
5672
}

dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,16 @@
2727
import org.dinky.data.model.Table;
2828

2929
import org.apache.flink.streaming.api.datastream.DataStream;
30+
import org.apache.flink.table.api.Schema;
31+
import org.apache.flink.table.catalog.Column;
32+
import org.apache.flink.table.catalog.ResolvedSchema;
33+
import org.apache.flink.table.catalog.UniqueConstraint;
34+
import org.apache.flink.table.types.AtomicDataType;
3035
import org.apache.flink.types.Row;
3136

3237
import java.io.Serializable;
38+
import java.util.ArrayList;
39+
import java.util.List;
3340

3441
public class SQLCatalogSinkBuilder extends AbstractSqlSinkBuilder implements Serializable {
3542

@@ -49,9 +56,19 @@ public void addTableSink(DataStream<Row> rowDataDataStream, Table table) {
4956
String tableName = getSinkTableName(table);
5057
// Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_
5158
String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline());
52-
59+
final ResolvedSchema resolvedSchema =
60+
customTableEnvironment.fromChangelogStream(rowDataDataStream).getResolvedSchema();
61+
List<Column> columns = new ArrayList<>();
62+
for (Column column : resolvedSchema.getColumns()) {
63+
columns.add(column.copy(new AtomicDataType(
64+
column.getDataType().getLogicalType().copy(false),
65+
column.getDataType().getConversionClass())));
66+
}
67+
final UniqueConstraint primaryKey = UniqueConstraint.primaryKey(viewName + "_pk", table.getPrimaryKeys());
68+
final ResolvedSchema sinkSchema = new ResolvedSchema(columns, resolvedSchema.getWatermarkSpecs(), primaryKey);
69+
final Schema schema = Schema.newBuilder().fromResolvedSchema(sinkSchema).build();
5370
customTableEnvironment.createTemporaryView(
54-
viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream));
71+
viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream, schema));
5572
logger.info("Create {} temporaryView successful...", viewName);
5673

5774
createInsertOperations(

dinky-common/src/main/java/org/dinky/data/model/Table.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ public static Table build(String name, String schema, List<Column> columns) {
107107
return new Table(name, schema, columns);
108108
}
109109

110+
@Transient
111+
public List<String> getPrimaryKeys() {
112+
return columns.stream().filter(Column::isKeyFlag).map(Column::getName).collect(Collectors.toList());
113+
}
114+
110115
@Transient
111116
public String getFlinkTableWith(String flinkConfig) {
112117
if (Asserts.isNotNullString(flinkConfig)) {

0 commit comments

Comments
 (0)