Search before asking
What happened
pg数据库整库同步到doris,无法同步物理删除事件的数据
What you expected to happen
希望能够同步物理删除的数据。
How to reproduce
SET 'table.exec.state.ttl' = '1h';
EXECUTE CDCSOURCE demo_doris
WITH
(
'connector' = 'postgres-cdc',
'hostname' = '192.168.0.37',
'port' = '5432',
'database-name' = 'db',
'schema-name' = 'public',
'username' = 'root',
'password' = 'root',
'checkpoint' = '600000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'public\.sys_dict_data2',
'debezium.decimal.handling.mode' = 'string',
'source.decoding.plugin.name' = 'pgoutput',
'source.include.schema.changes' = 'true',
'debezium.slot.name' = 'slot_sys_4',
'sink.url' = 'jdbc:mysql://192.168.0.223:9030/ods_pg',
'sink.auto.create' = 'true',
'sink.connector' = 'doris',
'sink.fenodes' = '192.168.0.223:8030',
'sink.username' = 'root',
'sink.password' = 'root',
'sink.doris.batch.size' = '1000',
'sink.sink.max-retries' = '1',
'sink.sink.db' = 'ods_pg',
'sink.sink.properties.format' = 'json',
'sink.sink.properties.table.create' = 'true',
'sink.sink.properties.table.create.properties.light_schema_change' = 'true',
'sink.sink.properties.read_json_by_line' = 'true',
-- 'sink.sink.properties.timezone' = 'Etc/UTC',
'sink.sink.enable-delete' = 'true',
'sink.table.identifier' = '#{schemaName}.#{tableName}',
'sink.sink.label-prefix' = '#{schemaName}_#{tableName}_19'
);
整库同步的代码如上,目前只测试了一个表。插入和更新都可以同步,删除无法同步。pg表的复制级别是default
初步排查了一下代码,
在org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#processElement 方法中,判断delete事件,会回撤,
public void processElement(StreamRecord<RowData> element) throws Exception {
final RowData row = element.getValue();
List<RowData> values = state.value();
if (values == null) {
values = new ArrayList<>(2);
}
switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
addRow(values, row);
break;
case UPDATE_BEFORE:
case DELETE:
retractRow(values, row);
break;
}
}
retractRow方法中,获取一个下标,如果等于-1, 则会直接返回了,并打印一个状态ttl的日志,我在同步的任务中加了SET 'table.exec.state.ttl' = '1h';
并没有效果,根本原因不是这个。
private void retractRow(List<RowData> values, RowData retract) throws IOException {
final int lastIndex = values.size() - 1;
final int index = findFirst(values, retract);
if (index == -1) {
LOG.info(STATE_CLEARED_WARN_MSG);
return;
} else {
// Remove first found row
values.remove(index);
}
if (values.isEmpty()) {
// Delete this row
retract.setRowKind(DELETE);
collector.collect(retract);
} else if (index == lastIndex) {
// Last row has been removed, update to the second last one
final RowData latestRow = values.get(values.size() - 1);
latestRow.setRowKind(UPDATE_AFTER);
collector.collect(latestRow);
}
if (values.isEmpty()) {
state.clear();
} else {
state.update(values);
}
}
获取下标的方法会判断有没有主键,两种情况处理的逻辑不一样。实例运行情况走了没有主键的情况,只能在把pg表的复制级别改为full 才能同步删除的数据,default不能。
private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) {
newRow.setRowKind(oldRow.getRowKind());
if (hasUpsertKey) {
return equaliser.equals(
upsertKeyProjectedRow1.replaceRow(newRow),
upsertKeyProjectedRow2.replaceRow(oldRow));
}
return equaliser.equals(newRow, oldRow);
}
调试了不少时间,初步发现在创建源表视图的时候主键丢失了,表实际是有主键的。方法org.dinky.cdc.sql.SQLSinkBuilder#addSourceTableView
private String addSourceTableView(DataStream<Row> rowDataDataStream, Table table) {
// Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_
String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline());
customTableEnvironment.createTemporaryView(
viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream));
logger.info("Create {} temporaryView successful...", viewName);
return viewName;
}
咱是初学者,问了一下ai, 从datastream转换为table的时候需要手动指定主键,并不会从datastream中去推断主键。
刚学习flink,不知道是不是这个原因导致的无法删除数据。还希望大神们帮忙排查一下。
Anything else
No response
Version
1.2.0
Are you willing to submit PR?
Code of Conduct
Search before asking
What happened
pg数据库整库同步到doris,无法同步物理删除事件的数据
What you expected to happen
希望能够同步物理删除的数据。
How to reproduce
整库同步的代码如上,目前只测试了一个表。插入和更新都可以同步,删除无法同步。pg表的复制级别是default
初步排查了一下代码,
在org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#processElement 方法中,判断delete事件,会回撤,
retractRow方法中,获取一个下标,如果等于-1, 则会直接返回了,并打印一个状态ttl的日志,我在同步的任务中加了SET 'table.exec.state.ttl' = '1h';
并没有效果,根本原因不是这个。
获取下标的方法会判断有没有主键,两种情况处理的逻辑不一样。实例运行情况走了没有主键的情况,只能在把pg表的复制级别改为full 才能同步删除的数据,default不能。
调试了不少时间,初步发现在创建源表视图的时候主键丢失了,表实际是有主键的。方法org.dinky.cdc.sql.SQLSinkBuilder#addSourceTableView
咱是初学者,问了一下ai, 从datastream转换为table的时候需要手动指定主键,并不会从datastream中去推断主键。
刚学习flink,不知道是不是这个原因导致的无法删除数据。还希望大神们帮忙排查一下。
Anything else
No response
Version
1.2.0
Are you willing to submit PR?
Code of Conduct