diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask.java index d6baf843a5c..b1a680b23e7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask.java @@ -305,6 +305,12 @@ private void createDataEventsForTable( Strings.duration(clock.currentTimeInMillis() - exportStart)); } catch (SQLException e) { throw new ConnectException("Snapshotting of table " + table.id() + " failed", e); + } finally { + try { + jdbcConnection.connection().setAutoCommit(true); + } catch (SQLException e) { + LOG.warn("Failed to set autoCommit after snapshot split read", e); + } } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index 4ce09a1e1c6..7a3ac0e72ff 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -319,6 +319,12 @@ private void createDataEventsForTable( Strings.duration(clock.currentTimeInMillis() - exportStart)); } catch (SQLException e) { throw new ConnectException("Snapshotting of table " + table.id() + " failed", e); + } finally { + try { + jdbcConnection.connection().setAutoCommit(true); + } catch (SQLException e) { + LOG.warn("Failed to set autoCommit after snapshot split read", e); + } } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java index 1915aca809c..631d174d47a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java @@ -353,6 +353,12 @@ private void createDataEventsForTable( } catch (SQLException e) { throw new FlinkRuntimeException( "Snapshotting of table " + table.id() + " failed", e); + } finally { + try { + jdbcConnection.connection().setAutoCommit(true); + } catch (SQLException e) { + LOG.warn("Failed to set autoCommit after snapshot split read", e); + } } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java index a2c266cb237..bc0aea6943d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java @@ -309,6 +309,12 @@ private void createDataEventsForTable( Strings.duration(clock.currentTimeInMillis() - exportStart)); } catch (SQLException e) { throw new ConnectException("Snapshotting of table " + table.id() + " failed", e); + } finally { + try { + jdbcConnection.connection().setAutoCommit(true); + } catch (SQLException e) { + LOG.warn("Failed to set autoCommit after snapshot split read", e); + } } }