From 1ed836aea5f679963ca2d549772a4880a66ed86a Mon Sep 17 00:00:00 2001 From: SkylerLin <44233950+linguoxuan@users.noreply.github.com> Date: Tue, 7 Apr 2026 17:33:47 +0800 Subject: [PATCH] [FLINK-39403][cdc] Fix transaction leak during snapshot split read for DB2, Oracle, PostgreSQL, and SQL Server connectors --- .../cdc/connectors/db2/source/fetch/Db2ScanFetchTask.java | 6 ++++++ .../oracle/source/reader/fetch/OracleScanFetchTask.java | 6 ++++++ .../postgres/source/fetch/PostgresScanFetchTask.java | 6 ++++++ .../source/reader/fetch/SqlServerScanFetchTask.java | 6 ++++++ 4 files changed, 24 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask.java index d6baf843a5c..b1a680b23e7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask.java @@ -305,6 +305,12 @@ private void createDataEventsForTable( Strings.duration(clock.currentTimeInMillis() - exportStart)); } catch (SQLException e) { throw new ConnectException("Snapshotting of table " + table.id() + " failed", e); + } finally { + try { + jdbcConnection.connection().setAutoCommit(true); + } catch (SQLException e) { + LOG.warn("Failed to set autoCommit after snapshot split read", e); + } } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index 4ce09a1e1c6..7a3ac0e72ff 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -319,6 +319,12 @@ private void createDataEventsForTable( Strings.duration(clock.currentTimeInMillis() - exportStart)); } catch (SQLException e) { throw new ConnectException("Snapshotting of table " + table.id() + " failed", e); + } finally { + try { + jdbcConnection.connection().setAutoCommit(true); + } catch (SQLException e) { + LOG.warn("Failed to set autoCommit after snapshot split read", e); + } } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java index 1915aca809c..631d174d47a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java @@ -353,6 +353,12 @@ private void createDataEventsForTable( } catch (SQLException e) { throw new FlinkRuntimeException( "Snapshotting of table " + table.id() + " failed", e); + } finally { + try { + jdbcConnection.connection().setAutoCommit(true); + } catch (SQLException e) { + LOG.warn("Failed to set autoCommit after snapshot split read", e); + } } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java index a2c266cb237..bc0aea6943d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java @@ -309,6 +309,12 @@ private void createDataEventsForTable( Strings.duration(clock.currentTimeInMillis() - exportStart)); } catch (SQLException e) { throw new ConnectException("Snapshotting of table " + table.id() + " failed", e); + } finally { + try { + jdbcConnection.connection().setAutoCommit(true); + } catch (SQLException e) { + LOG.warn("Failed to set autoCommit after snapshot split read", e); + } } }