Skip to content

[Bug] [Module Name] The entire library of the pg database is synchronized to doris, and the data of physical deleted events cannot be synchronized #4486

@qzztf

Description

@qzztf

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

pg数据库整库同步到doris,无法同步物理删除事件的数据

What you expected to happen

希望能够同步物理删除的数据。

How to reproduce

SET 'table.exec.state.ttl' = '1h';
EXECUTE CDCSOURCE demo_doris
WITH
  (
    'connector' = 'postgres-cdc',
    'hostname' = '192.168.0.37',
    'port' = '5432',
    'database-name' = 'db',
    'schema-name' = 'public',
    'username' = 'root',
    'password' = 'root',
    'checkpoint' = '600000',
    'scan.startup.mode' = 'initial',
    'parallelism' = '1',
    'table-name' = 'public\.sys_dict_data2',
    'debezium.decimal.handling.mode' = 'string',
    'source.decoding.plugin.name' = 'pgoutput',
    'source.include.schema.changes' = 'true',
    'debezium.slot.name' = 'slot_sys_4',
    'sink.url' = 'jdbc:mysql://192.168.0.223:9030/ods_pg',
    'sink.auto.create' = 'true',
    'sink.connector' = 'doris',
    'sink.fenodes' = '192.168.0.223:8030',
    'sink.username' = 'root',
    'sink.password' = 'root',
    'sink.doris.batch.size' = '1000',
    'sink.sink.max-retries' = '1',
    'sink.sink.db' = 'ods_pg',
    'sink.sink.properties.format' = 'json',
    'sink.sink.properties.table.create' = 'true',
    'sink.sink.properties.table.create.properties.light_schema_change' = 'true',
    'sink.sink.properties.read_json_by_line' = 'true',
    --  'sink.sink.properties.timezone' = 'Etc/UTC',
    'sink.sink.enable-delete' = 'true',
    'sink.table.identifier' = '#{schemaName}.#{tableName}',
    'sink.sink.label-prefix' = '#{schemaName}_#{tableName}_19'
  );

整库同步的代码如上,目前只测试了一个表。插入和更新都可以同步,删除无法同步。pg表的复制级别是default
初步排查了一下代码,
在org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer#processElement 方法中,判断delete事件,会回撤,

public void processElement(StreamRecord<RowData> element) throws Exception {
        final RowData row = element.getValue();
        List<RowData> values = state.value();
        if (values == null) {
            values = new ArrayList<>(2);
        }

        switch (row.getRowKind()) {
            case INSERT:
            case UPDATE_AFTER:
                addRow(values, row);
                break;

            case UPDATE_BEFORE:
            case DELETE:
                retractRow(values, row);
                break;
        }
    }

retractRow方法中,获取一个下标,如果等于-1, 则会直接返回了,并打印一个状态ttl的日志,我在同步的任务中加了SET 'table.exec.state.ttl' = '1h';
并没有效果,根本原因不是这个。

private void retractRow(List<RowData> values, RowData retract) throws IOException {
        final int lastIndex = values.size() - 1;
        final int index = findFirst(values, retract);
        if (index == -1) {
            LOG.info(STATE_CLEARED_WARN_MSG);
            return;
        } else {
            // Remove first found row
            values.remove(index);
        }
        if (values.isEmpty()) {
            // Delete this row
            retract.setRowKind(DELETE);
            collector.collect(retract);
        } else if (index == lastIndex) {
            // Last row has been removed, update to the second last one
            final RowData latestRow = values.get(values.size() - 1);
            latestRow.setRowKind(UPDATE_AFTER);
            collector.collect(latestRow);
        }

        if (values.isEmpty()) {
            state.clear();
        } else {
            state.update(values);
        }
    }

获取下标的方法会判断有没有主键,两种情况处理的逻辑不一样。实例运行情况走了没有主键的情况,只能在把pg表的复制级别改为full 才能同步删除的数据,default不能。

private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) {
        newRow.setRowKind(oldRow.getRowKind());
        if (hasUpsertKey) {
            return equaliser.equals(
                    upsertKeyProjectedRow1.replaceRow(newRow),
                    upsertKeyProjectedRow2.replaceRow(oldRow));
        }
        return equaliser.equals(newRow, oldRow);
    }

调试了不少时间,初步发现在创建源表视图的时候主键丢失了,表实际是有主键的。方法org.dinky.cdc.sql.SQLSinkBuilder#addSourceTableView

private String addSourceTableView(DataStream<Row> rowDataDataStream, Table table) {
        // Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_
        String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline());

        customTableEnvironment.createTemporaryView(
                viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream));
        logger.info("Create {} temporaryView successful...", viewName);
        return viewName;
    }

咱是初学者,问了一下ai, 从datastream转换为table的时候需要手动指定主键,并不会从datastream中去推断主键。
刚学习flink,不知道是不是这个原因导致的无法删除数据。还希望大神们帮忙排查一下。

Anything else

No response

Version

1.2.0

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    BugSomething isn't workingInvalidInvalid

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions