diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java index 032d35fa5c8..960cdf3bbd5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; +import io.debezium.connector.postgresql.PostgresOffsetContext; import io.debezium.connector.postgresql.SourceInfo; import io.debezium.connector.postgresql.connection.Lsn; import io.debezium.time.Conversions; @@ -43,7 +44,34 @@ public class PostgresOffset extends Offset { // used by PostgresOffsetFactory PostgresOffset(Map offset) { - this.offset = offset; + Map filtered = new HashMap<>(offset); + // When a checkpoint is taken right after a COMMIT (state-3a), all three LSN fields + // converge to the same value: lsn == lsn_proc == lsn_commit. + // Recovering from such a checkpoint constructs WalPositionLocator(C0, C0), which + // causes the first new transaction's DML records (data_start=C0 in pgoutput) to be + // silently dropped: they are added to lsnSeen during the find phase, but + // startStreamingLsn is set to the next COMMIT (C1), so they are filtered in the + // stream phase. + // + // Fix: when lsn == lsn_proc == lsn_commit, remove lsn_proc and lsn_commit so that + // WalPositionLocator is constructed with lastCommitStoredLsn=null, which triggers + // the fast path: startStreamingLsn=firstLsnReceived=C0, all messages pass through. + // + // The triple-equality condition is safe: mid-transaction checkpoints (state-3b) have + // lsn_commit pointing to the previous commit, so lsn_commit != lsn, and this branch + // is not taken. + // + // This workaround can be removed once Debezium is upgraded to a version that + // includes DBZ-6204: + // https://github.com/debezium/debezium/commit/3b5740f1a836c8b438888f2458ebb1554320bac7 + String lsnVal = filtered.get(SourceInfo.LSN_KEY); + String lsnProc = filtered.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY); + String lsnCommit = filtered.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); + if (lsnVal != null && lsnVal.equals(lsnProc) && lsnVal.equals(lsnCommit)) { + filtered.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY); + filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); + } + this.offset = filtered; } PostgresOffset(Long lsn, Long txId, Instant lastCommitTs) {