Skip to content

Commit 940c243

Browse files
committed
resolve comments
1 parent 0bdbe16 commit 940c243

4 files changed

Lines changed: 107 additions & 20 deletions

File tree

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerSchemaChangeResolver.java

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ public class SqlServerSchemaChangeResolver implements SchemaChangeResolver {
7171
private static final Pattern ALTER_TABLE_PATTERN =
7272
Pattern.compile(
7373
"(?i)ALTER\\s+TABLE\\s+(.+?)\\s+(ADD|DROP|ALTER|RENAME|WITH|SWITCH)\\b");
74+
private static final Pattern SP_RENAME_COLUMN_PATTERN =
75+
Pattern.compile(
76+
"(?i)(?:EXEC(?:UTE)?\\s+)?(?:(?:\\[[^\\]]+\\]|[^.\\s]+)\\.)?(?:sys\\.)?sp_rename\\s+N?'([^']+)'\\s*,\\s*N?'([^']+)'\\s*,\\s*N?'COLUMN'");
7477
private static final Pattern TABLE_IDENTIFIER_PART_PATTERN =
7578
Pattern.compile("\\[([^\\]]+)]|\"([^\"]+)\"|`([^`]+)`|([^\\.\\s]+)");
7679

@@ -104,7 +107,9 @@ public SchemaChangeEvent resolve(SourceRecord record, List<CatalogTable> catalog
104107
return null;
105108
}
106109

107-
List<AlterTableColumnEvent> events = diffColumns(currentCatalogTable, currentTable);
110+
String ddl = SourceRecordUtils.getDdl(record);
111+
List<AlterTableColumnEvent> events =
112+
diffColumns(currentCatalogTable, currentTable, parseSpRenameColumns(ddl));
108113
if (events.isEmpty()) {
109114
log.info(
110115
"Ignoring SQL Server schema change without column diff for table {}",
@@ -114,7 +119,7 @@ public SchemaChangeEvent resolve(SourceRecord record, List<CatalogTable> catalog
114119

115120
TableIdentifier tableIdentifier = currentCatalogTable.getTableId();
116121
AlterTableColumnsEvent event = new AlterTableColumnsEvent(tableIdentifier, events);
117-
event.setStatement(SourceRecordUtils.getDdl(record));
122+
event.setStatement(ddl);
118123
event.setSourceDialectName(SOURCE_DIALECT);
119124
return event;
120125
}
@@ -163,7 +168,9 @@ && isSameIdentifier(table.id().table(), tablePath.getTableName())) {
163168
}
164169

165170
private List<AlterTableColumnEvent> diffColumns(
166-
CatalogTable currentCatalogTable, Table currentTable) {
171+
CatalogTable currentCatalogTable,
172+
Table currentTable,
173+
Map<String, String> explicitRenames) {
167174
List<Column> previousColumns = currentCatalogTable.getTableSchema().getColumns();
168175
List<io.debezium.relational.Column> newColumns = currentTable.columns();
169176
TableIdentifier tableIdentifier = currentCatalogTable.getTableId();
@@ -224,7 +231,8 @@ private List<AlterTableColumnEvent> diffColumns(
224231
}
225232
}
226233

227-
pairRenameColumns(events, tableIdentifier, newColumns, removedColumns, addedColumns);
234+
pairRenameColumns(
235+
events, tableIdentifier, newColumns, removedColumns, addedColumns, explicitRenames);
228236

229237
for (ColumnWithIndex<io.debezium.relational.Column> added : addedColumns) {
230238
Column convertedColumn = convertColumn(added.value);
@@ -244,34 +252,44 @@ private List<AlterTableColumnEvent> diffColumns(
244252
return events;
245253
}
246254

247-
/**
248-
* Attempts to pair "removed" and "added" columns as rename operations by matching on index
249-
* position and identical type/nullable/length definition.
250-
*
251-
* <p><b>Limitation:</b> the diff-based heuristic can produce false positives. If a column is
252-
* dropped and a structurally identical column is added at the same ordinal position in a single
253-
* DDL statement, this method will incorrectly emit a RENAME event instead of DROP + ADD. There
254-
* is no DDL text available at this stage to disambiguate the intent. Callers must accept this
255-
* trade-off; the emitted RENAME event is functionally safe (the downstream schema will remain
256-
* consistent) but may cause unexpected rename semantics for some sinks.
257-
*/
255+
/** Extracts explicit SQL Server column renames declared through {@code sp_rename}. */
256+
private Map<String, String> parseSpRenameColumns(String ddl) {
257+
if (StringUtils.isBlank(ddl)) {
258+
return Collections.emptyMap();
259+
}
260+
Map<String, String> renames = new LinkedHashMap<>();
261+
Matcher matcher = SP_RENAME_COLUMN_PATTERN.matcher(ddl);
262+
while (matcher.find()) {
263+
String oldColumnName = extractLastIdentifierPart(matcher.group(1));
264+
String newColumnName = extractLastIdentifierPart(matcher.group(2));
265+
if (StringUtils.isBlank(oldColumnName) || StringUtils.isBlank(newColumnName)) {
266+
continue;
267+
}
268+
renames.put(normalizeIdentifier(oldColumnName), normalizeIdentifier(newColumnName));
269+
}
270+
return renames;
271+
}
272+
258273
private void pairRenameColumns(
259274
List<AlterTableColumnEvent> events,
260275
TableIdentifier tableIdentifier,
261276
List<io.debezium.relational.Column> newColumns,
262277
List<ColumnWithIndex<Column>> removedColumns,
263-
List<ColumnWithIndex<io.debezium.relational.Column>> addedColumns) {
278+
List<ColumnWithIndex<io.debezium.relational.Column>> addedColumns,
279+
Map<String, String> explicitRenames) {
264280
Set<ColumnWithIndex<Column>> matchedRemoved = new HashSet<>();
265281
Set<ColumnWithIndex<io.debezium.relational.Column>> matchedAdded = new HashSet<>();
266282

267283
for (ColumnWithIndex<io.debezium.relational.Column> added : addedColumns) {
284+
String addedName = normalizeIdentifier(added.value.name());
268285
Column convertedAdded = convertColumn(added.value);
269286
ColumnWithIndex<Column> renameCandidate = null;
270287
for (ColumnWithIndex<Column> removed : removedColumns) {
271288
if (matchedRemoved.contains(removed)) {
272289
continue;
273290
}
274-
if (removed.index != added.index) {
291+
String removedName = normalizeIdentifier(removed.value.getName());
292+
if (!StringUtils.equalsIgnoreCase(addedName, explicitRenames.get(removedName))) {
275293
continue;
276294
}
277295
if (!sameDefinitionExceptName(removed.value, convertedAdded)) {
@@ -443,6 +461,14 @@ private boolean isSourceTypeCompatible(String oldSourceType, String newSourceTyp
443461
&& (!normalizedOld.contains("(") || !normalizedNew.contains("("));
444462
}
445463

464+
private String extractLastIdentifierPart(String identifier) {
465+
List<String> parts = parseIdentifierParts(identifier);
466+
if (!parts.isEmpty()) {
467+
return parts.get(parts.size() - 1);
468+
}
469+
return identifier;
470+
}
471+
446472
private TablePath parseTablePathFromDdl(String ddl) {
447473
if (StringUtils.isBlank(ddl)) {
448474
return null;

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerSchemaChangeResolverTest.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
2626
import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
2727
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
28+
import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
2829
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
2930
import org.apache.seatunnel.api.table.type.BasicType;
3031
import org.apache.seatunnel.connectors.cdc.debezium.ConnectTableChangeSerializer;
@@ -114,7 +115,13 @@ void shouldResolveRenameColumnEventFromTableChanges() {
114115
Arrays.asList(
115116
intColumn("id", 1), varcharColumn("full_name", 2, 64)))
116117
.create());
117-
SourceRecord record = createRecord(tableChanges);
118+
SourceRecord record =
119+
createRecord(
120+
tableChanges,
121+
DATABASE_NAME,
122+
SCHEMA_NAME,
123+
TABLE_NAME,
124+
"EXEC sp_rename 'dbo.customers.name', 'full_name', 'COLUMN'");
118125

119126
SchemaChangeEvent event =
120127
resolver.resolve(record, Collections.singletonList(createCatalogTable()));
@@ -131,6 +138,48 @@ void shouldResolveRenameColumnEventFromTableChanges() {
131138
Assertions.assertEquals("id", changeEvent.getAfterColumn());
132139
}
133140

141+
@Test
142+
void shouldEmitDropAndAddWhenDdlIsNotExplicitRename() {
143+
TableChanges tableChanges = new TableChanges();
144+
tableChanges.alter(
145+
Table.editor()
146+
.tableId(
147+
new io.debezium.relational.TableId(
148+
DATABASE_NAME, SCHEMA_NAME, TABLE_NAME))
149+
.setPrimaryKeyNames(Collections.singletonList("id"))
150+
.setColumns(
151+
Arrays.asList(
152+
intColumn("id", 1), varcharColumn("full_name", 2, 64)))
153+
.create());
154+
SourceRecord record =
155+
createRecord(
156+
tableChanges,
157+
DATABASE_NAME,
158+
SCHEMA_NAME,
159+
TABLE_NAME,
160+
"ALTER TABLE dbo.customers DROP COLUMN name; ALTER TABLE dbo.customers ADD full_name VARCHAR(64)");
161+
162+
SchemaChangeEvent event =
163+
resolver.resolve(record, Collections.singletonList(createCatalogTable()));
164+
165+
Assertions.assertInstanceOf(AlterTableColumnsEvent.class, event);
166+
AlterTableColumnsEvent columnsEvent = (AlterTableColumnsEvent) event;
167+
Assertions.assertEquals(2, columnsEvent.getEvents().size());
168+
Assertions.assertInstanceOf(
169+
AlterTableAddColumnEvent.class, columnsEvent.getEvents().get(0));
170+
Assertions.assertInstanceOf(
171+
AlterTableDropColumnEvent.class, columnsEvent.getEvents().get(1));
172+
173+
AlterTableAddColumnEvent addEvent =
174+
(AlterTableAddColumnEvent) columnsEvent.getEvents().get(0);
175+
Assertions.assertEquals("full_name", addEvent.getColumn().getName());
176+
Assertions.assertEquals("id", addEvent.getAfterColumn());
177+
178+
AlterTableDropColumnEvent dropEvent =
179+
(AlterTableDropColumnEvent) columnsEvent.getEvents().get(1);
180+
Assertions.assertEquals("name", dropEvent.getColumn());
181+
}
182+
134183
@Test
135184
void shouldResolveSchemaChangeWhenIdentifiersAreBracketed() {
136185
SourceRecord record =

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -794,8 +794,7 @@ public void testWithSchemaEvolution(TestContainer container) {
794794
CompletableFuture.supplyAsync(
795795
() -> {
796796
try {
797-
container.executeJob(
798-
"/sqlservercdc_to_sqlserver_with_schema_change.conf");
797+
container.executeJob("/sqlservercdc_to_sqlserver_with_schema_change.conf");
799798
} catch (Exception e) {
800799
throw new RuntimeException(e);
801800
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/sqlserver_schema_change_rename_columns.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,19 @@ EXEC sys.sp_cdc_enable_table
3030
@capture_instance = 'dbo_products_v4',
3131
@supports_net_changes = 0;
3232

33+
-- SQL Server exposes sp_rename as a capture-instance switch. SeaTunnel now
34+
-- treats it conservatively as ADD + DROP unless the downstream sink can
35+
-- preserve existing values itself, so force real post-switch row updates under
36+
-- the renamed column. A no-op assignment is not enough here because SQL Server
37+
-- may skip writing CDC rows when the value does not change.
38+
UPDATE dbo.products
39+
SET add_column = add_column + 1000
40+
WHERE id IN (101, 103, 104, 105, 106, 107, 108, 109, 110, 120, 121, 128, 129, 130, 131, 140, 141);
41+
42+
UPDATE dbo.products
43+
SET add_column = add_column - 1000
44+
WHERE id IN (101, 103, 104, 105, 106, 107, 108, 109, 110, 120, 121, 128, 129, 130, 131, 140, 141);
45+
3346
DELETE FROM dbo.products WHERE id = 130;
3447
INSERT INTO dbo.products VALUES (150, 'scooter', 'Small 2-wheel scooter', 3.14, 1);
3548
INSERT INTO dbo.products VALUES (151, 'car battery', '12V car battery', 8.1, 2);

0 commit comments

Comments
 (0)