Skip to content

[FLINK-39252][pipeline-connector/sqlserver] Add SQL Server pipeline connector#4320

Merged
leonardBang merged 2 commits into
apache:masterfrom
Daishuyuan:FLINK-39252
Jun 16, 2026
Merged

[FLINK-39252][pipeline-connector/sqlserver] Add SQL Server pipeline connector#4320
leonardBang merged 2 commits into
apache:masterfrom
Daishuyuan:FLINK-39252

Conversation

@Daishuyuan

@Daishuyuan Daishuyuan commented Mar 17, 2026

Copy link
Copy Markdown
Contributor

What is changed

  • add a new SQL Server pipeline connector module with source provider, event deserializer, metadata accessor, schema/type utilities and factory wiring
  • integrate the new module into the pipeline connectors build and register the connector factory service
  • add coverage for full types, metadata access, online schema migration, parallel snapshot, newly added tables and table pattern matching
  • add the source-connector side updates needed by the pipeline connector integration

Why

Flink CDC currently has a source SQL Server connector but does not provide the corresponding pipeline connector. This change adds SQL Server pipeline connector support so SQL Server can be used in pipeline jobs consistently with other databases.

@Daishuyuan

Daishuyuan commented Mar 19, 2026

Copy link
Copy Markdown
Contributor Author

Hi @lvyanquan, @loserwang1024, could you help review this PR when you have time?

I have addressed the previous comment and updated the SQL Server pipeline connector implementation and tests accordingly. Thanks.

@Daishuyuan

Copy link
Copy Markdown
Contributor Author

Hi @yuxiqian, @leonardBang, this PR adds the SQL Server pipeline connector and also includes several source-side SQL Server changes. Could you help take a look when you have time? Thanks.

@leonardBang

Copy link
Copy Markdown
Contributor

Hi @yuxiqian, @leonardBang, this PR adds the SQL Server pipeline connector and also includes several source-side SQL Server changes. Could you help take a look when you have time? Thanks.

@Daishuyuan Thanks for the contribution! But we're going to code freeze soon and have no enough committers to help review this PR, considering this feature was not included in 3.6 roadmap, maybe we'd like to delivery this feature in next version.

@Daishuyuan Daishuyuan requested a review from zml1206 March 26, 2026 06:21
@leonardBang leonardBang requested review from Copilot and removed request for zml1206 April 7, 2026 03:16

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds a new SQL Server pipeline connector to Flink CDC and extends SQL Server CDC source/pipeline integration to support additional behaviors like timestamp startup, metadata access, schema/type utilities, and broader integration test coverage.

Changes:

  • Added SQL Server pipeline connector module and registered its DataSourceFactory via service loader + build wiring.
  • Added "timestamp" startup mode support (table factory + pipeline datasource) and implemented timestamp→LSN offset creation for SQL Server.
  • Added extensive unit/integration tests for metadata access, full type coverage, schema migration, parallel snapshot behavior, newly added tables, and table pattern matching.

Reviewed changes

Copilot reviewed 35 out of 35 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java Adds table-factory tests for timestamp startup mode validation.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactoryTest.java Adds unit coverage for timestamp startup option propagation.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java Adds timestamp startup option parsing + option registration.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/offset/LsnFactory.java Implements timestamp→LSN mapping and logging for timestamp startup.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java Accepts TIMESTAMP and SNAPSHOT startup modes; wires scanNewlyAddedTableEnabled.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java Plumbs scanNewlyAddedTableEnabled into base config.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceBuilder.java Passes source config into LsnFactory to enable timestamp offset creation.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java Adds handling for SQL Server error code 313 during CDC window queries.
flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml Adds new pipeline connector module to the build.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/resources/log4j2-test.properties Test logging configuration for the new module.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/resources/ddl/inventory.sql SQL Server test dataset for inventory scenarios.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/resources/ddl/column_type_test.sql SQL Server test dataset for full type coverage.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/testutils/SqlServerSourceTestUtils.java Adds shared test utilities for result collection + polling.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/testutils/RecordDataTestUtils.java Adds test helper for extracting RecordData fields with type-aware getters.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerTablePatternMatchingTest.java Adds tests for include/exclude table pattern matching.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerPipelineNewlyAddedTableITCase.java Adds IT coverage for scanning newly added tables across savepoint restore.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerPipelineITCaseTest.java Adds pipeline source IT cases for startup modes and metadata.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerParallelizedPipelineITCase.java Adds mini-cluster pipeline IT tests for parallel snapshot + streaming.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerOnlineSchemaMigrationITCase.java Adds IT coverage for online schema changes (add/alter/drop column).
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerMetadataAccessorITCase.java Adds IT coverage for namespaces/schemas/tables/schema retrieval.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerFullTypesITCase.java Adds IT coverage for reading and validating a broad SQL Server type set.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerEventDeserializerTest.java Adds unit tests for schema-change event deserialization.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/factory/SqlServerDataSourceFactoryTest.java Adds unit tests for factory options, pattern matching, and timestamp startup validation.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory Registers SQL Server pipeline factory via service loader.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/utils/SqlServerTypeUtils.java Adds Debezium column → Flink CDC type conversion utilities.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/utils/SqlServerSchemaUtils.java Adds schema/table discovery, schema conversion, SQL Server Agent validation.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/SqlServerPipelineRecordEmitter.java Emits CreateTableEvent(s) appropriately for pipeline mode + caches schemas.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSchemaDataTypeInference.java Adds SQL Server-specific schema inference type (currently default behavior).
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerPipelineSource.java Adds pipeline source variant using the pipeline record emitter.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerMetadataAccessor.java Adds MetadataAccessor for SQL Server pipeline connector.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerEventDeserializer.java Adds pipeline deserializer for data changes + schema changes + metadata.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerDataSourceOptions.java Adds pipeline connector configuration options.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerDataSource.java Adds SQL Server pipeline DataSource wiring deserializer/source/metadata accessor.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/factory/SqlServerDataSourceFactory.java Adds factory: validation, option parsing, table pattern resolution, startup modes.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/pom.xml New module POM with runtime/test dependencies and shading config.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +59 to +61
return SqlServerSchemaUtils.listSchemas(sourceConfig, namespace);
}

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listSchemas(@Nullable String namespace) forwards a nullable value into SqlServerSchemaUtils.listSchemas(...), which currently formats SQL using quote(namespace) and will NPE (or query [null]). Handle namespace == null explicitly (e.g., default to the configured database from sourceConfig.getDatabaseList() or throw a ValidationException with a clear message).

Suggested change
return SqlServerSchemaUtils.listSchemas(sourceConfig, namespace);
}
return SqlServerSchemaUtils.listSchemas(sourceConfig, resolveNamespace(namespace));
}
private String resolveNamespace(@Nullable String namespace) {
if (namespace != null) {
return namespace;
}
List<String> configuredDatabases = sourceConfig.getDatabaseList();
if (configuredDatabases != null && !configuredDatabases.isEmpty()) {
return configuredDatabases.get(0);
}
throw new IllegalArgumentException(
"Namespace must not be null when listing SQL Server schemas and no "
+ "database is configured in the source configuration.");
}

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment on lines +333 to +348
private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, T sideEvent) {
List<T> result = new ArrayList<>(size);
List<T> sideResults = new ArrayList<>();
while (size > 0 && iter.hasNext()) {
T event = iter.next();
if (sideEvent.getClass().isInstance(event)) {
sideResults.add(sideEvent);
} else {
result.add(event);
size--;
}
}
// Also ensure we've received at least one or many side events.
assertThat(sideResults).isNotEmpty();
return result;
}

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a side event is encountered, the code adds the sideEvent parameter rather than the actual event read from the iterator (sideResults.add(sideEvent)). This is logically incorrect and can hide issues if the test later inspects side events. Add the actual event instead.

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}
}
}
return null;

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the savepoint never succeeds after retries, the method returns null, and the test proceeds as if it had a savepoint path. This can lead to misleading failures (or restoring without a savepoint). Prefer throwing an exception (e.g., TimeoutException/AssertionError) once retries are exhausted so the test fails fast with a clear reason.

Suggested change
return null;
throw new AssertionError(
String.format(
"Failed to trigger savepoint in directory '%s' after %d retries.",
savepointDirectory, retryTimes));

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment on lines +82 to +83
case Types.FLOAT:
return DataTypes.FLOAT();

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JDBC Types.FLOAT typically represents a double-precision floating type (while Types.REAL is single-precision). Mapping both REAL and FLOAT to DataTypes.FLOAT() can lose precision for SQL Server FLOAT columns and contradicts the expected behavior in the added type IT coverage. Consider mapping Types.REAL -> DataTypes.FLOAT() and Types.FLOAT -> DataTypes.DOUBLE() (or use Debezium column metadata to decide).

Suggested change
case Types.FLOAT:
return DataTypes.FLOAT();
return DataTypes.FLOAT();
case Types.FLOAT:

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


Map<String, String> configMap = config.toMap();
mergeJdbcPropertiesIntoDebeziumProperties(configMap);
String databaseName = getValidateDatabaseName(tables).orElseThrow();

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issues in database-name extraction/validation:\n1) If all entries are skipped (e.g., due to format issues), getValidateDatabaseName(...) can return Optional.empty(), and the caller uses .orElseThrow() without a message, resulting in a NoSuchElementException that doesn't explain the config problem.\n2) The length validation error message uses TABLES.key() (the literal key name) instead of the invalid value (e.g., trimmedTableName or the full tables string), making troubleshooting harder.\nRecommendation: treat any non-db.schema.table entry as a configuration error (fail fast with ValidationException/IllegalArgumentException containing the actual option value) and avoid returning empty when the option is required.

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed "orElseThrow", and I process it by anothor way.

Comment on lines +393 to +397
checkState(
tableNameParts.length == 3,
String.format(
"Tables format must db.schema.table, can not 'tables' = %s",
TABLES.key()));

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issues in database-name extraction/validation:\n1) If all entries are skipped (e.g., due to format issues), getValidateDatabaseName(...) can return Optional.empty(), and the caller uses .orElseThrow() without a message, resulting in a NoSuchElementException that doesn't explain the config problem.\n2) The length validation error message uses TABLES.key() (the literal key name) instead of the invalid value (e.g., trimmedTableName or the full tables string), making troubleshooting harder.\nRecommendation: treat any non-db.schema.table entry as a configuration error (fail fast with ValidationException/IllegalArgumentException containing the actual option value) and avoid returning empty when the option is required.

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

.stringType()
.noDefaultValue()
.withDescription(
"Name of the SQL Server database to use when connecting to the SQL Server database server.");

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The username option description is incorrect: it describes a database name rather than the user name. Update the description to avoid confusing users configuring the connector.

Suggested change
"Name of the SQL Server database to use when connecting to the SQL Server database server.");
"Name of the SQL Server user to use when connecting to the SQL Server database server.");

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

TimeUnit.MILLISECONDS.sleep(interval.toMillis());
}
throw new TimeoutException(
"Ran out of time when waiting for " + description + " to success.");

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix grammar in the timeout message: replace 'to success' with 'to succeed'.

Suggested change
"Ran out of time when waiting for " + description + " to success.");
"Ran out of time when waiting for " + description + " to succeed.");

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment on lines +32 to +35
// SQL Server doesn't have special geometry types like PostgreSQL,
// so we can use the default implementation from the parent class.
// If there are SQL Server specific types that need special handling,
// they can be added here by overriding the inferStruct method.

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is misleading: SQL Server does have spatial types (geometry, geography) and other special types (e.g., hierarchyid). If the intent is 'no special handling needed currently', consider rephrasing to avoid stating that these types don't exist.

Suggested change
// SQL Server doesn't have special geometry types like PostgreSQL,
// so we can use the default implementation from the parent class.
// If there are SQL Server specific types that need special handling,
// they can be added here by overriding the inferStruct method.
// SQL Server has database-specific types, but no special handling is currently
// needed here, so this class uses the default implementation from the parent class.
// If SQL Server-specific types require special handling in the future,
// it can be added here by overriding the inferStruct method.

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

INSERT INTO products(name,description,weight)
VALUES ('rocks','box of assorted rocks',5.3);
INSERT INTO products(name,description,weight)
VALUES ('jacket','water resistent black wind breaker',0.1);

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in sample data: 'resistent' should be 'resistant'. If any assertions depend on the exact string, update those expected values accordingly.

Suggested change
VALUES ('jacket','water resistent black wind breaker',0.1);
VALUES ('jacket','water resistant black wind breaker',0.1);

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

int port = config.get(PORT);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String chunkKeyColumn = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN is read from config at this line and passed to configFactory.chunkKeyColumn(), but it is NOT declared in optionalOptions(). This means FactoryHelper.validateExcept() will reject any user-provided value as an "unknown option," making the chunk key column configuration inaccessible to users.

Suggested fix: Add options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); to optionalOptions(), or remove the config read and .chunkKeyColumn() call if the option is intentionally unsupported.

— glm-5.1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


Map<String, String> configMap = config.toMap();
mergeJdbcPropertiesIntoDebeziumProperties(configMap);
String databaseName = getValidateDatabaseName(tables).orElseThrow();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] getValidateDatabaseName(tables).orElseThrow() throws a bare NoSuchElementException with no contextual message when the tables config has no entries matching the db.schema.table format. This makes debugging difficult for users.

Suggested fix:

String databaseName =
    getValidateDatabaseName(tables)
        .orElseThrow(() -> new IllegalArgumentException(
            "Cannot determine database name from 'tables' option: " + tables
            + ". Expected format: database.schema.table"));

— glm-5.1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");

public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND/LOWER_BOUND are defined with identical keys, defaults, and fallback keys as CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND/LOWER_BOUND. The factory reads values from CHUNK_KEY_* but error messages reference SPLIT_KEY_*, creating confusing inconsistency. Other pipeline connectors (e.g., Oracle) only have the CHUNK_KEY_* variants.

Suggested fix: Remove SPLIT_KEY_* variants and update error messages in validateDistributionFactorUpper/validateDistributionFactorLower to use CHUNK_KEY_*.key().

— glm-5.1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

trimmedTableName.split(
"(?<!\\\\)\\.", -1); // Use -1 to avoid ignoring trailing empty elements

checkState(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] isValidSqlServerDbName uses regex [a-zA-Z_@#][a-zA-Z0-9_@#$]* which rejects valid SQL Server identifiers that use bracket-quoting with hyphens, spaces, or other special characters (e.g., [my-database]). Real-world databases can have such names.

Suggested fix: Either relax isValidSqlServerDbName to accept a wider set of characters matching SQL Server's actual naming rules, or remove this validation and rely on the downstream JDBC connection failing if the name is truly invalid.

— glm-5.1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

"Tables format must db.schema.table, can not 'tables' = %s",
TABLES.key()));
String currentDbName = tableNameParts[0];

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] The error message "Tables format must db.schema.table, can not 'tables' = %s" is grammatically broken (missing "be" after "must") and the format argument is TABLES.key() (the option name "tables") rather than the actual malformed table identifier. This makes the error unhelpful for users diagnosing what went wrong.

Suggested fix:

checkState(
    tableNameParts.length == 3,
    String.format(
        "Table '%s' does not match the expected 'database.schema.table' format. "
            + "Please check the value of option '%s'.",
        trimmedTableName, TABLES.key()));

— glm-5.1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

new EventTypeInfo())
.executeAndCollect();

Thread.sleep(5_000);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] Thread.sleep(5_000) is used as a synchronization barrier without any condition check. If the source hasn't finished the snapshot within 5 seconds, subsequent operations may read events out of order. Conversely, on fast machines this wastes time. The same pattern appears at line 128 and in SqlServerPipelineNewlyAddedTableITCase.java:327 with Thread.sleep(1000L).

Suggested fix: Replace with a polling loop (e.g., SqlServerSourceTestUtils.loopCheck) that waits until the expected condition is met, similar to the pattern already used in SqlServerParallelizedPipelineITCase.

— glm-5.1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return namespaceNames;
}

public static String quote(String dbOrTableName) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] The quote() method wraps identifiers in [...] but does not escape embedded ] characters as ]]. The source connector module's SqlServerUtils.quote() correctly does "[" + name.replace("]", "]]") + "]". This inconsistency means database names containing ] would produce malformed SQL in the pipeline module's listSchemas() method.

Suggested fix:

return "[" + dbOrTableName.replace("]", "]]") + "]";

— glm-5.1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@wenshao wenshao left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: SQL Server Pipeline Connector

Reviewed the full diff (35 files, ~6000 lines). The implementation follows established patterns from other pipeline connectors (MySQL, PostgreSQL, Oracle) and is well-structured overall.

Summary

7 inline suggestions posted covering:

  • Missing optionalOptions() declaration for chunk key column config
  • Bare NoSuchElementException without contextual message
  • Duplicate config option constants (SPLIT_KEY_* vs CHUNK_KEY_*)
  • Overly strict database name validation regex
  • Grammatically broken error message
  • Thread.sleep without condition checks in tests (flaky risk)
  • quote() method missing ]]] escaping

Additionally, there is a potential issue worth a closer look: the error 313 handling in SqlServerStreamingChangeEventSource.java may not advance the LSN position, which could cause an infinite retry loop if the error is persistent (CDC data purged).

Reviewed by glm-5.1 via Qwen Code /review

@wenshao wenshao left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: SQL Server Pipeline Connector

Reviewed the full diff (35 files, ~6000 lines). The implementation follows established patterns from other pipeline connectors (MySQL, PostgreSQL, Oracle) and is well-structured overall.

Summary

7 inline suggestions posted covering:

  • Missing optionalOptions() declaration for chunk key column config
  • Bare NoSuchElementException without contextual message
  • Duplicate config option constants (SPLIT_KEY_* vs CHUNK_KEY_*)
  • Overly strict database name validation regex
  • Grammatically broken error message
  • Thread.sleep without condition checks in tests (flaky risk)
  • quote() method missing ]]] escaping

Additionally, there is a potential issue worth a closer look: the error 313 handling in SqlServerStreamingChangeEventSource.java may not advance the LSN position, which could cause an infinite retry loop if the error is persistent (CDC data purged).

Reviewed by glm-5.1 via Qwen Code /review

@Daishuyuan

Copy link
Copy Markdown
Contributor Author

Thanks for the review. I addressed the comments in the latest commits.

Updated:

  • added null namespace fallback handling in the metadata accessor
  • registered scan.incremental.snapshot.chunk.key-column
  • improved tables validation and related error messages
  • relaxed database name validation for SQL Server
  • fixed chunk distribution factor error messages to use CHUNK_KEY_*
  • fixed bracket escaping in identifier quoting
  • corrected REAL / FLOAT type mapping
  • cleaned up a few test and wording issues raised in review
  • added regression tests for the above cases

I also updated the SQL Server error 313 handling in the source connector. Instead of retrying with the same stale LSN, it now fails fast with a clear message that the CDC range is no longer available and suggests re-snapshotting or advancing to the minimum available LSN.

SCAN_STARTUP_TIMESTAMP_MILLIS.key()));
}
return StartupOptions.timestamp(startupTimestampMillis);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Critical] scan.startup.mode=timestamp is accepted here, but the non-parallel execution path still delegates to SqlServerSource.Builder.build(), whose startup-mode switch only supports INITIAL and LATEST_OFFSET and otherwise throws UnsupportedOperationException. As a result, a configuration that passes validation can still fail at runtime when scan.incremental.snapshot.enabled remains false. Please either reject timestamp mode for the non-parallel path or add matching runtime support there.

— gpt-5.4 via Qwen Code /review

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. I fixed it in the latest update.

scan.startup.mode=timestamp is now explicitly validated together with scan.incremental.snapshot.enabled, and the factory throws a ValidationException for the legacy non-parallel path instead of letting it fail later at runtime. I also added a test to cover that timestamp startup requires incremental snapshot mode.

Timestamp.valueOf(
Instant.ofEpochMilli(timestampMillis)
.atZone(serverZoneId)
.toLocalDateTime());

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Critical] startupTimestamp is converted using serverTimeZone, but it is then bound with PreparedStatement#setTimestamp(...) without an explicit Calendar. JDBC will interpret that timestamp in the JVM default timezone instead of the configured SQL Server timezone, so scan.startup.mode=timestamp can resolve to the wrong LSN when those timezones differ. Please bind the query parameter with a timezone-aware Calendar (or otherwise preserve the configured server timezone when passing the timestamp to JDBC).

— gpt-5.4 via Qwen Code /review

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I fixed it in the latest update.

The timestamp is now bound with an explicit Calendar created from the configured serverTimeZone, so the JDBC parameter keeps the SQL Server timezone semantics instead of falling back to the JVM default timezone. I also added an integration test that sets the JVM default timezone differently from the configured server timezone to verify the timestamp startup path.

@SendDreams

Copy link
Copy Markdown

hope sqlserver source pipeline support flink 1.20

@Daishuyuan

Copy link
Copy Markdown
Contributor Author

Hi @leonardBang, I’ve rebased onto master. Could you please help trigger the CI? Thank you!

@Daishuyuan

Copy link
Copy Markdown
Contributor Author

It seems that the CI timeout is unrelated to the PR, and at the moment we’re not sure if there’s a way to move forward. @leonardBang

@leonardBang leonardBang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Daishuyuan for the substantial work here — this is a well-structured, near-complete connector that closely follows the Postgres pipeline-connector blueprint (Factory/ DataSource / EventDeserializer / MetadataAccessor / RecordEmitter / SchemaUtils / TypeUtils). Overall direction is right and I only left a little comments.

Btw, this PR adds a new pipeline connector but does not add user-facing docs (docs/content/docs/connectors/pipeline-connectors/sqlserver.md, zh doc, and overview updates). Please add documentation in this PR or open a subtask to track.

return DataTypes.STRING();
case MONEY:
case SMALL_MONEY:
return DataTypes.DECIMAL(10, 4);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

money and smallmoney have different ranges and must not share the same mapping. SQL Server money is an 8-byte type with range ±922,337,203,685,477.5807, i.e. DECIMAL(19, 4);
only smallmoney fits in DECIMAL(10, 4). Mapping money to DECIMAL(10, 4) will overflow/truncate any value above ~10 digits and silently corrupt data. Please split them:

case MONEY:
return DataTypes.DECIMAL(19, 4);
case SMALL_MONEY:
return DataTypes.DECIMAL(10, 4);
Could you also add these two columns (with large money values) to SqlServerFullTypesITCase so the mapping is covered?

this.alreadySendCreateTableTables = new HashSet<>();
this.createTableEventCache = new HashMap<>();
this.isBounded = StartupOptions.snapshot().equals(sourceConfig.getStartupOptions());
generateCreateTableEvents();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar concern as in the deserializer: this introduces another connector-local create-table cache instead of reusing the framework cache / split-restoration mechanism already used by other pipeline connectors.

* {@link #initializeTableSchemaCacheFromSplitSchemas(Map)}). It must not be {@code final}
* because Java deserialization bypasses field initializers for {@code transient} fields.
*/
private transient Map<TableId, Schema> tableSchemaCache;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have framework-level schema cache support in DebeziumEventDeserializationSchema via getCreateTableEventCache() / applyCreateTableEvent(). Re-introducing a connector-local tableSchemaCache here makes SQL Server diverge from the pattern used by Postgres and creates another state recovery path to maintain. Could we reuse the framework cache instead?

+ "It is important to note that the dot (.) is treated as a delimiter for database, schema and table names. "
+ "If there is a need to use a dot (.) in a regular expression to match any character, "
+ "it is necessary to escape the dot with a backslash."
+ "eg. db0.dbo.\\.*, db1.dbo.user_table_[0-9]+, db[1-2].dbo.[app|web]_order_\\.*");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example suggests database-level regex / multi-database matching is supported, but the factory validation below requires a single consistent literal database name. Please align the option description with the actual current capability.

+ "It is important to note that the dot (.) is treated as a delimiter for database, schema and table names. "
+ "If there is a need to use a dot (.) in a regular expression to match any character, "
+ "it is necessary to escape the dot with a backslash."
+ "eg. db0.dbo.\\.*, db1.dbo.user_table_[0-9]+, db[1-2].dbo.[app|web]_order_\\.*");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as tables: this example appears to advertise cross-database matching, but the current factory logic does not support that.

@leonardBang

Copy link
Copy Markdown
Contributor

Hey @Daishuyuan, would you like to take a look my comments when you have time ?

@Daishuyuan

Copy link
Copy Markdown
Contributor Author

Thanks for the reminder, @leonardBang, and sorry for the delay. I’ll go through your comments carefully and address the remaining issues in the next update, including the type mapping, schema-cache handling, option descriptions, and docs/follow-up task. Thanks again for the review!

@leonardBang

Copy link
Copy Markdown
Contributor

Thanks for the reminder, @leonardBang, and sorry for the delay. I’ll go through your comments carefully and address the remaining issues in the next update, including the type mapping, schema-cache handling, option descriptions, and docs/follow-up task. Thanks again for the review!

No sorry @Daishuyuan, I'm asking just because I reviewed this meaningful PR last week and had some progress with AI assistance https://github.com/leonardbang/flink-cdc/tree/fork/Daishuyuan/FLINK-39252, maybe you can refer this change too.

@github-actions github-actions Bot added the docs Improvements or additions to documentation label Jun 15, 2026
@Daishuyuan

Copy link
Copy Markdown
Contributor Author

Thanks @leonardBang, I have addressed these comments in the latest update.

@leonardBang

Copy link
Copy Markdown
Contributor

Thanks @Daishuyuan for the quickly response, the PR looks really good to me, and I'll help to polish a little (mainly about documentation part) and merge soon

@leonardBang leonardBang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 from my side now, wait for the CI green

@Daishuyuan

Copy link
Copy Markdown
Contributor Author

@leonardBang, I pushed an update to fix the SQL Server test compilation under the Flink 2 profile.

This update replaces the removed Flink 1.x APIs used by SQL Server pipeline tests:

  • RestartStrategies -> RestartStrategyUtils
  • SavepointConfigOptions.SAVEPOINT_PATH -> StateRecoveryOptions.SAVEPOINT_PATH
  • triggerSavepoint(path) -> triggerSavepoint(path, SavepointFormatType.DEFAULT)
  • collect result setup now follows the existing MySQL/Postgres/Oracle Flink 2 compatibility pattern.

For the other failed checks, they look unrelated to this PR:

  • Source Unit Tests 2.x failed in OceanBaseFailoverITCase.testJobManagerFailoverFromLatestOffset with a result assertion mismatch.
  • Pipeline E2E Tests failed in MySqlToIcebergE2eITCase.testSyncWholeDatabase with a sink result assertion mismatch.
  • The SQL Server-related Flink 2 compilation failure should be covered by this update.

Could you please rerun the CI? Thanks.

@leonardBang

Copy link
Copy Markdown
Contributor

@leonardBang, I pushed an update to fix the SQL Server test compilation under the Flink 2 profile.

This update replaces the removed Flink 1.x APIs used by SQL Server pipeline tests:

  • RestartStrategies -> RestartStrategyUtils
  • SavepointConfigOptions.SAVEPOINT_PATH -> StateRecoveryOptions.SAVEPOINT_PATH
  • triggerSavepoint(path) -> triggerSavepoint(path, SavepointFormatType.DEFAULT)
  • collect result setup now follows the existing MySQL/Postgres/Oracle Flink 2 compatibility pattern.

For the other failed checks, they look unrelated to this PR:

  • Source Unit Tests 2.x failed in OceanBaseFailoverITCase.testJobManagerFailoverFromLatestOffset with a result assertion mismatch.
  • Pipeline E2E Tests failed in MySqlToIcebergE2eITCase.testSyncWholeDatabase with a sink result assertion mismatch.
  • The SQL Server-related Flink 2 compilation failure should be covered by this update.

Could you please rerun the CI? Thanks.

Sure, my pleasure

@Daishuyuan

Daishuyuan commented Jun 16, 2026

Copy link
Copy Markdown
Contributor Author

@leonardBang. The remaining failure seems to be caused by insufficient CI runner resources, as the OceanBase container failed to start due to not enough disk space.

OBD-2003: / not enough disk space. (Avail: 3G, Need: 10G)
boot failed!
OceanBaseE2eITCase » ContainerLaunch Container startup failed

@leonardBang

Copy link
Copy Markdown
Contributor

@leonardBang. The remaining failure seems to be caused by insufficient CI runner resources, as the OceanBase container failed to start due to not enough disk space.

OBD-2003: / not enough disk space. (Avail: 3G, Need: 10G) boot failed! OceanBaseE2eITCase » ContainerLaunch Container startup failed

Yeah, you're right, the failed test is a flaky test and isn't related to our PR, I will merge this PR soon.

…erver pipeline YAML e2e test

This closes apache#4320.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@leonardBang leonardBang merged commit b451d9a into apache:master Jun 16, 2026
Mrart pushed a commit to Mrart/flink-cdc that referenced this pull request Jun 30, 2026
…erver pipeline YAML e2e test

This closes apache#4320.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved docs Improvements or additions to documentation e2e-tests sqlserver-cdc-connector

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants