Skip to content

Commit 32bef0e

Browse files
authored
[Feature][Connector-V2][SqlServer CDC] support sqlserver schema evolution (#10890)
1 parent df468e5 commit 32bef0e

20 files changed

Lines changed: 1909 additions & 61 deletions

File tree

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ private SourceRecordUtils() {}
5151
public static final List<String> SUPPORT_SCHEMA_CHANGE_EVENT_KEY_NAME =
5252
Arrays.asList(
5353
"io.debezium.connector.mysql.SchemaChangeKey",
54-
"io.debezium.connector.oracle.SchemaChangeKey");
54+
"io.debezium.connector.oracle.SchemaChangeKey",
55+
"io.debezium.connector.sqlserver.SchemaChangeKey");
5556

5657
public static final String HEARTBEAT_VALUE_SCHEMA_KEY_NAME =
5758
"io.debezium.connector.common.Heartbeat";

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java

Lines changed: 125 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,15 @@ public class SqlServerConnection extends JdbcConnection {
129129

130130
private static final String GET_NEW_CHANGE_TABLES =
131131
"SELECT * FROM [#db].cdc.change_tables WHERE start_lsn BETWEEN ? AND ?";
132+
private static final String GET_DDL_HISTORY =
133+
"SELECT OBJECT_SCHEMA_NAME(source_object_id, DB_ID(?)),"
134+
+ " OBJECT_NAME(source_object_id, DB_ID(?)),"
135+
+ " ddl_command,"
136+
+ " ddl_lsn,"
137+
+ " ddl_time"
138+
+ " FROM [#db].cdc.ddl_history"
139+
+ " WHERE ddl_lsn > ? AND ddl_lsn <= ?"
140+
+ " ORDER BY ddl_lsn ASC";
132141
private static final String OPENING_QUOTING_CHARACTER = "[";
133142
private static final String CLOSING_QUOTING_CHARACTER = "]";
134143

@@ -377,36 +386,29 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce
377386
protected Optional<ColumnEditor> readTableColumn(
378387
ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnFilter)
379388
throws SQLException {
380-
return doReadTableColumn(columnMetadata, tableId, columnFilter);
389+
return doReadTableColumn(columnMetadata, tableId, columnFilter, null);
381390
}
382391

392+
/**
393+
* Reads a single column from the JDBC metadata ResultSet.
394+
*
395+
* @param columnTypeMapping pre-fetched UDT name map (column name → type name) for the table, or
396+
* {@code null} to fall back to per-column query
397+
*/
383398
private Optional<ColumnEditor> doReadTableColumn(
384-
ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnFilter)
399+
ResultSet columnMetadata,
400+
TableId tableId,
401+
Tables.ColumnNameFilter columnFilter,
402+
Map<String, String> columnTypeMapping)
385403
throws SQLException {
386404
// Oracle drivers require this for LONG/LONGRAW to be fetched first.
387405
final String defaultValue = columnMetadata.getString(13);
388-
String tableSql =
389-
StringUtils.isNotEmpty(tableId.table())
390-
? "AND tbl.name = '" + tableId.table() + "'"
391-
: "";
392-
393-
Map<String, String> columnTypeMapping = new HashMap<>();
394406

395-
// Support user-defined types (UDTs)
396-
try (PreparedStatement ps =
397-
connection()
398-
.prepareStatement(
399-
String.format(
400-
SELECT_COLUMNS_SQL_TEMPLATE,
401-
tableId.schema(),
402-
tableSql));
403-
ResultSet resultSet = ps.executeQuery()) {
404-
while (resultSet.next()) {
405-
String columnName = resultSet.getString("column_name");
406-
String dataType = resultSet.getString("type");
407-
columnTypeMapping.put(columnName, dataType);
408-
}
407+
if (columnTypeMapping == null) {
408+
// Fallback: fetch UDT mapping for this single call
409+
columnTypeMapping = fetchColumnTypeMapping(tableId);
409410
}
411+
410412
final String columnName = columnMetadata.getString(4);
411413
if (columnFilter == null
412414
|| columnFilter.matches(
@@ -446,6 +448,32 @@ private Optional<ColumnEditor> doReadTableColumn(
446448
return Optional.empty();
447449
}
448450

451+
/**
452+
* Fetches UDT type names for all columns in {@code tableId} in a single query.
453+
*
454+
* @return map of column name -> resolved type name (empty map if the query returns no rows)
455+
*/
456+
private Map<String, String> fetchColumnTypeMapping(TableId tableId) throws SQLException {
457+
String tableSql =
458+
StringUtils.isNotEmpty(tableId.table())
459+
? "AND tbl.name = '" + tableId.table() + "'"
460+
: "";
461+
Map<String, String> mapping = new HashMap<>();
462+
try (PreparedStatement ps =
463+
connection()
464+
.prepareStatement(
465+
String.format(
466+
SELECT_COLUMNS_SQL_TEMPLATE,
467+
tableId.schema(),
468+
tableSql));
469+
ResultSet resultSet = ps.executeQuery()) {
470+
while (resultSet.next()) {
471+
mapping.put(resultSet.getString("column_name"), resultSet.getString("type"));
472+
}
473+
}
474+
return mapping;
475+
}
476+
449477
/**
450478
* Provides all changes recorder by the SQL Server CDC capture process for a set of tables.
451479
*
@@ -695,12 +723,42 @@ public List<SqlServerChangeTable> getNewChangeTables(
695723
});
696724
}
697725

726+
public List<SqlServerDdlEntry> getDdlHistory(String databaseName, Lsn fromLsn, Lsn toLsn)
727+
throws SQLException {
728+
final String query = replaceDatabaseNamePlaceholder(GET_DDL_HISTORY, databaseName);
729+
730+
return prepareQueryAndMap(
731+
query,
732+
ps -> {
733+
ps.setString(1, databaseName);
734+
ps.setString(2, databaseName);
735+
ps.setBytes(3, fromLsn.getBinary());
736+
ps.setBytes(4, toLsn.getBinary());
737+
},
738+
rs -> {
739+
final List<SqlServerDdlEntry> ddlEntries = new ArrayList<>();
740+
while (rs.next()) {
741+
ddlEntries.add(
742+
new SqlServerDdlEntry(
743+
new TableId(databaseName, rs.getString(1), rs.getString(2)),
744+
rs.getString(3),
745+
Lsn.valueOf(rs.getBytes(4)),
746+
rs.getTimestamp(5)));
747+
}
748+
return ddlEntries;
749+
});
750+
}
751+
698752
public Table getTableSchemaFromTable(String databaseName, SqlServerChangeTable changeTable)
699753
throws SQLException {
700754
final DatabaseMetaData metadata = connection().getMetaData();
701755
JdbcIdentifierUtils.IdentifierCaseStrategy identifierCaseStrategy =
702756
JdbcIdentifierUtils.identifierCaseStrategy(metadata);
703757

758+
// Fetch UDT type mapping once for the whole table to avoid N queries inside the column
759+
final Map<String, String> columnTypeMapping =
760+
fetchColumnTypeMapping(changeTable.getSourceTableId());
761+
704762
List<Column> columns = new ArrayList<>();
705763
int filteredRows = 0;
706764
try (ResultSet rs =
@@ -728,7 +786,7 @@ public Table getTableSchemaFromTable(String databaseName, SqlServerChangeTable c
728786
filteredRows++;
729787
continue;
730788
}
731-
readTableColumn(rs, changeTable.getSourceTableId(), null)
789+
doReadTableColumn(rs, changeTable.getSourceTableId(), null, columnTypeMapping)
732790
.ifPresent(
733791
ce -> {
734792
// Filter out columns not included in the change table.
@@ -763,6 +821,50 @@ public String getNameOfChangeTable(String captureName) {
763821
return captureName + "_CT";
764822
}
765823

824+
public static class SqlServerDdlEntry {
825+
private final TableId sourceTableId;
826+
private final String ddl;
827+
private final Lsn ddlLsn;
828+
private final java.sql.Timestamp ddlTime;
829+
830+
public SqlServerDdlEntry(
831+
TableId sourceTableId, String ddl, Lsn ddlLsn, java.sql.Timestamp ddlTime) {
832+
this.sourceTableId = sourceTableId;
833+
this.ddl = ddl;
834+
this.ddlLsn = ddlLsn;
835+
this.ddlTime = ddlTime;
836+
}
837+
838+
public TableId getSourceTableId() {
839+
return sourceTableId;
840+
}
841+
842+
public String getDdl() {
843+
return ddl;
844+
}
845+
846+
public Lsn getDdlLsn() {
847+
return ddlLsn;
848+
}
849+
850+
public java.sql.Timestamp getDdlTime() {
851+
return ddlTime;
852+
}
853+
854+
@Override
855+
public String toString() {
856+
return "SqlServerDdlEntry{"
857+
+ "sourceTableId="
858+
+ sourceTableId
859+
+ ", ddlLsn="
860+
+ ddlLsn
861+
+ ", ddl='"
862+
+ ddl
863+
+ "'"
864+
+ '}';
865+
}
866+
}
867+
766868
/**
767869
* Retrieve the name of the database in the original case as it's defined on the server.
768870
*

0 commit comments

Comments
 (0)