From dddcea6c522b08989197a70a0b1f91f13b33f19f Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 18 Mar 2026 17:49:31 +0800 Subject: [PATCH 1/4] fix --- .../connectors/postgres/source/offset/PostgresOffset.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java index 032d35fa5c8..2d8d1dec78e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; +import io.debezium.connector.postgresql.PostgresOffsetContext; import io.debezium.connector.postgresql.SourceInfo; import io.debezium.connector.postgresql.connection.Lsn; import io.debezium.time.Conversions; @@ -43,7 +44,10 @@ public class PostgresOffset extends Offset { // used by PostgresOffsetFactory PostgresOffset(Map offset) { - this.offset = offset; + Map filtered = new HashMap<>(offset); + filtered.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY); // lsn_proc + filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); // lsn_commit + this.offset = filtered; } PostgresOffset(Long lsn, Long txId, Instant lastCommitTs) { From 36cb6eb49aaab58a8411546db4b5ac3106bde559 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 19 Mar 2026 11:29:07 +0800 Subject: [PATCH 2/4] code style --- .../cdc/connectors/postgres/source/offset/PostgresOffset.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java index 2d8d1dec78e..f30c7762501 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java @@ -46,7 +46,7 @@ public class PostgresOffset extends Offset { PostgresOffset(Map offset) { Map filtered = new HashMap<>(offset); filtered.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY); // lsn_proc - filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); // lsn_commit + filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); // lsn_commit this.offset = filtered; } From 6f56d775f356130863e8dc7d4013bf48a0012459 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 20 Mar 2026 12:46:44 +0800 Subject: [PATCH 3/4] fix --- .../source/offset/PostgresOffset.java | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java index f30c7762501..3b3bc46dd5c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java @@ -45,8 +45,30 @@ public class PostgresOffset extends Offset { // used by PostgresOffsetFactory PostgresOffset(Map offset) { Map filtered = new HashMap<>(offset); - filtered.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY); // lsn_proc - filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); // lsn_commit + // When lsn == lsn_proc, WalPositionLocator is constructed with + // (lastCommitStoredLsn=Y, lastEventStoredLsn=Y). In pgoutput non-streaming mode, + // BEGIN and DML of the first new transaction after recovery share the same + // data_start as the previous COMMIT LSN (Y). WalPositionLocator puts Y into + // lsnSeen but sets startStreamingLsn to the new COMMIT LSN (Z), causing those + // DML records to be silently dropped in the actual streaming phase + // (Y in lsnSeen, Y != Z -> filtered). + // + // Fix: when lsn == lsn_proc, remove lsn_proc and lsn_commit so that + // WalPositionLocator is constructed with lastCommitStoredLsn=null, which + // triggers the fast path: startStreamingLsn=firstLsnReceived=Y, switch-off + // happens immediately and all messages pass through. + // + // This is fixed upstream in Debezium via DBZ-6204 (adding Operation.COMMIT to + // the lastProcessedMessageType check in WalPositionLocator.resumeFromLsn): + // https://github.com/debezium/debezium/commit/3b5740f1a836c8b438888f2458ebb1554320bac7 + // This workaround can be removed once Debezium is upgraded to a version that + // includes DBZ-6204. + String lsnVal = filtered.get(SourceInfo.LSN_KEY); + String lsnProc = filtered.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY); + if (lsnVal != null && lsnVal.equals(lsnProc)) { + filtered.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY); + filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); + } this.offset = filtered; } From 22b31665f03ba4713f1412651bd91cfb8e498dfe Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 20 Mar 2026 16:40:08 +0800 Subject: [PATCH 4/4] fix --- .../source/offset/PostgresOffset.java | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java index 3b3bc46dd5c..960cdf3bbd5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java @@ -45,27 +45,29 @@ public class PostgresOffset extends Offset { // used by PostgresOffsetFactory PostgresOffset(Map offset) { Map filtered = new HashMap<>(offset); - // When lsn == lsn_proc, WalPositionLocator is constructed with - // (lastCommitStoredLsn=Y, lastEventStoredLsn=Y). In pgoutput non-streaming mode, - // BEGIN and DML of the first new transaction after recovery share the same - // data_start as the previous COMMIT LSN (Y). WalPositionLocator puts Y into - // lsnSeen but sets startStreamingLsn to the new COMMIT LSN (Z), causing those - // DML records to be silently dropped in the actual streaming phase - // (Y in lsnSeen, Y != Z -> filtered). + // When a checkpoint is taken right after a COMMIT (state-3a), all three LSN fields + // converge to the same value: lsn == lsn_proc == lsn_commit. + // Recovering from such a checkpoint constructs WalPositionLocator(C0, C0), which + // causes the first new transaction's DML records (data_start=C0 in pgoutput) to be + // silently dropped: they are added to lsnSeen during the find phase, but + // startStreamingLsn is set to the next COMMIT (C1), so they are filtered in the + // stream phase. // - // Fix: when lsn == lsn_proc, remove lsn_proc and lsn_commit so that - // WalPositionLocator is constructed with lastCommitStoredLsn=null, which - // triggers the fast path: startStreamingLsn=firstLsnReceived=Y, switch-off - // happens immediately and all messages pass through. + // Fix: when lsn == lsn_proc == lsn_commit, remove lsn_proc and lsn_commit so that + // WalPositionLocator is constructed with lastCommitStoredLsn=null, which triggers + // the fast path: startStreamingLsn=firstLsnReceived=C0, all messages pass through. + // + // The triple-equality condition is safe: mid-transaction checkpoints (state-3b) have + // lsn_commit pointing to the previous commit, so lsn_commit != lsn, and this branch + // is not taken. // - // This is fixed upstream in Debezium via DBZ-6204 (adding Operation.COMMIT to - // the lastProcessedMessageType check in WalPositionLocator.resumeFromLsn): - // https://github.com/debezium/debezium/commit/3b5740f1a836c8b438888f2458ebb1554320bac7 // This workaround can be removed once Debezium is upgraded to a version that - // includes DBZ-6204. + // includes DBZ-6204: + // https://github.com/debezium/debezium/commit/3b5740f1a836c8b438888f2458ebb1554320bac7 String lsnVal = filtered.get(SourceInfo.LSN_KEY); String lsnProc = filtered.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY); - if (lsnVal != null && lsnVal.equals(lsnProc)) { + String lsnCommit = filtered.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); + if (lsnVal != null && lsnVal.equals(lsnProc) && lsnVal.equals(lsnCommit)) { filtered.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY); filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); }