Skip to content

Commit 8ef4d7f

Browse files
committed
支持按照SR表的字段顺序写入数据
1 parent f60eb08 commit 8ef4d7f

4 files changed

Lines changed: 95 additions & 6 deletions

File tree

dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/postgres/PostgresCDCBuilder.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,29 @@ public String generateUrl(String schema) {
118118
protected String getMetadataType() {
119119
return METADATA_TYPE;
120120
}
121+
122+
@Override
123+
public Map<String, String> parseMetaDataConfig() {
124+
String url = String.format(
125+
"jdbc:postgres://%s:%d/%s",
126+
config.getHostname(), config.getPort(), composeJdbcProperties(config.getJdbc()));
127+
return parseMetaDataSingleConfig(url);
128+
}
129+
130+
private String composeJdbcProperties(Map<String, String> jdbcProperties) {
131+
if (jdbcProperties == null || jdbcProperties.isEmpty()) {
132+
return "";
133+
}
134+
135+
StringBuilder sb = new StringBuilder();
136+
sb.append('?');
137+
jdbcProperties.forEach((k, v) -> {
138+
sb.append(k);
139+
sb.append("=");
140+
sb.append(v);
141+
sb.append("&");
142+
});
143+
sb.deleteCharAt(sb.length() - 1);
144+
return sb.toString();
145+
}
121146
}

dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,16 @@ private List<Operation> addSinkInsert(
100100
FlinkTableObjectIdentifier targetTable,
101101
String sinkSchemaName,
102102
FlinkTableObjectIdentifier sinkTable) {
103-
String pkList = StringUtils.join(getPKList(table), ".");
104-
String flinkDDL = FlinkStatementUtil.getFlinkDDL(table, targetTable, config, sinkSchemaName, sinkTable, pkList);
103+
Table sinkTableObject = table.getSinkTable();
104+
if (sinkTableObject == null) {
105+
sinkTableObject = table;
106+
}
107+
String pkList = StringUtils.join(getPKList(sinkTableObject), ".");
108+
String flinkDDL = FlinkStatementUtil.getFlinkDDL(sinkTableObject, targetTable, config, sinkSchemaName, sinkTable, pkList);
105109
logger.info(flinkDDL);
106110
customTableEnvironment.executeSql(flinkDDL);
107111
logger.info("Create {} FlinkSQL DDL successful...", targetTable);
108-
return createInsertOperations(table, sourceTable, targetTable);
112+
return createInsertOperations(sinkTableObject, sourceTable, targetTable);
109113
}
110114

111115
@Override

dinky-common/src/main/java/org/dinky/data/model/Table.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ public class Table implements Serializable, Comparable<Table>, Cloneable {
6767

6868
private List<Column> columns;
6969

70+
/** The sink table for the source table */
71+
private Table sinkTable;
72+
7073
/** 驱动类型, @see org.dinky.metadata.enums.DriverType */
7174
private String driverType;
7275

dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ public TableResult execute(Executor executor) {
129129
String tableName = schemaTableName.split("\\.")[1];
130130
table.setColumns(driver.listColumnsSortByPK(realSchemaName, tableName));
131131
schemaList.add(schema);
132-
132+
Driver sinkRealDriver = getDriver(config, schemaName);
133+
final List<Table> sinkTables= getSinkTables(config, schemaName);
134+
setSinkTable(table, sinkTables, sinkBuilder, sinkRealDriver);
133135
if (null != sinkDriver) {
134136
final String createTableOptions = config.getSink().get(FlinkCDCConfig.AUTO_CREATE_OPTIONS);
135137
Table sinkTable = (Table) table.clone();
@@ -151,6 +153,8 @@ public TableResult execute(Executor executor) {
151153
Driver driver = Driver.build(confMap.get("name"), confMap.get("type"), JsonUtils.toMap(confMap));
152154

153155
final List<Table> tables = driver.listTables(schemaName);
156+
Driver sinkRealDriver = getDriver(config, schemaName);
157+
final List<Table> sinkTables= getSinkTables(config, schemaName);
154158
for (Table table : tables) {
155159
if (!Asserts.isEquals(table.getType(), "VIEW")) {
156160
if (Asserts.isNotNullCollection(tableRegList)) {
@@ -160,13 +164,15 @@ public TableResult execute(Executor executor) {
160164
table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
161165
schema.getTables().add(table);
162166
schemaTableNameList.add(table.getSchemaTableName());
167+
setSinkTable(table, sinkTables, sinkBuilder, sinkRealDriver);
163168
break;
164169
}
165170
}
166171
} else {
167172
table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
168173
schemaTableNameList.add(table.getSchemaTableName());
169174
schema.getTables().add(table);
175+
setSinkTable(table, sinkTables, sinkBuilder, sinkRealDriver);
170176
}
171177
}
172178
}
@@ -223,19 +229,70 @@ public TableResult execute(Executor executor) {
223229
return tableResultBuilder.build();
224230
}
225231

232+
private static void setSinkTable(Table table, List<Table> sinkTables, SinkBuilder sinkBuilder, Driver sinkRealDriver) {
233+
String sinkTableName = sinkBuilder.getSinkTableName(table);
234+
for (Table sinkTable : sinkTables) {
235+
String sinkTableSchema = sinkTable.getSchema();
236+
String sinkTableSchemaTableName = sinkTable.getSchemaTableName();
237+
String currentSinkTableName = sinkTableSchemaTableName;
238+
if (Asserts.isContainsString(sinkTableSchemaTableName, ".")) {
239+
currentSinkTableName = sinkTableSchemaTableName.split("\\.")[1];
240+
}
241+
if (sinkTableName.equals(currentSinkTableName)) {
242+
if (null != sinkRealDriver) {
243+
sinkTable.setColumns(sinkRealDriver.listColumnsSortByPK(sinkTableSchema, currentSinkTableName));
244+
}
245+
table.setSinkTable(sinkTable);
246+
break;
247+
}
248+
}
249+
}
250+
251+
private List<Table> getSinkTables(FlinkCDCConfig config, String schemaName) throws Exception {
252+
List<Table> sinkTables;
253+
Driver sinkDriver = getDriver(config, schemaName);
254+
if (null == sinkDriver) {
255+
return new ArrayList<>();
256+
}
257+
Map<String, String> sink = config.getSink();
258+
String schema = schemaName;
259+
String sinkDb = sink.get(FlinkCDCConfig.SINK_DB);
260+
if (Asserts.isNotNullString(sinkDb)) {
261+
schema = SqlUtil.replaceAllParam(sinkDb, "schemaName", schemaName);
262+
}
263+
sinkTables = sinkDriver.listTables(schema);
264+
return sinkTables;
265+
}
266+
226267
private Driver checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws Exception {
227268
Map<String, String> sink = config.getSink();
228269
String autoCreate = sink.get(FlinkCDCConfig.AUTO_CREATE);
229270
if (!Asserts.isEqualsIgnoreCase(autoCreate, "true") || Asserts.isNullString(schemaName)) {
230271
return null;
231272
}
232-
String url = sink.get("url");
273+
return getDriver(config, schemaName);
274+
}
275+
276+
private Driver getDriver(FlinkCDCConfig config, String schemaName) throws Exception {
277+
Map<String, String> sink = config.getSink();
278+
String connector = sink.get("connector");
279+
String url;
280+
if (Asserts.isEquals(connector, "starrocks")) {
281+
url = sink.get("jdbc-url");
282+
} else if (Asserts.isEqualsIgnoreCase(connector, "doris")) {
283+
url = "jdbc:mysql://" + sink.get("fenodes");
284+
} else if (Asserts.isEqualsIgnoreCase(connector, "jdbc")) {
285+
url = sink.get("url");
286+
} else {
287+
return null;
288+
}
233289
String schema = schemaName;
234290
String sinkDb = sink.get(FlinkCDCConfig.SINK_DB);
235291
if (Asserts.isNotNullString(sinkDb)) {
236292
schema = SqlUtil.replaceAllParam(sinkDb, "schemaName", schemaName);
237293
}
238-
Driver driver = Driver.build(sink.get("connector"), url, sink.get("username"), sink.get("password"));
294+
295+
Driver driver = Driver.build(connector, url, sink.get("username"), sink.get("password"));
239296
if (null != driver && !driver.existSchema(schema)) {
240297
driver.createSchema(schema);
241298
}

0 commit comments

Comments
 (0)