From 51cd95a0e3791391371b3b2d4433f3b1da134b9d Mon Sep 17 00:00:00 2001 From: ziyanTOP <49580493+ziyanTOP@users.noreply.github.com> Date: Tue, 19 May 2026 19:09:44 +0800 Subject: [PATCH] [Fix](mongodb-cdc) Fix MongoDB connection options propagation and filter system collections This commit fixes two issues: 1. connection-options not being passed to MongoDBSourceBuilder: The internal Flink CDC key is \"connection.options\" (with dot), but users naturally pass \"connection-options\" (with hyphen). Added getConnectionOptions() to try both keys for compatibility. Also added .scheme() and .connectionOptions() calls to MongoDBSourceBuilder which were previously missing. 2. Unauthorized errors on system collections: MongoDB system collections (e.g. system.profile) cannot be accessed without elevated privileges. Added a filter to skip collections starting with \"system.\" during schema inference. --- .../tools/cdc/mongodb/MongoDBDatabaseSync.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java index b77ec0e3e..91465c340 100644 --- a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java +++ b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java @@ -109,7 +109,7 @@ public List getSchemaList() throws Exception { config.get(MongoDBSourceOptions.PASSWORD), config.get(MongoDBSourceOptions.SCHEME), config.get(MongoDBSourceOptions.HOSTS), - config.get(MongoDBSourceOptions.CONNECTION_OPTIONS)))); + getConnectionOptions()))); MongoClientSettings settings = settingsBuilder.build(); Double samplePercent = config.get(MONGO_CDC_CREATE_SAMPLE_PERCENT); @@ -119,6 +119,9 @@ public List getSchemaList() throws Exception { MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName); MongoIterable collectionNames = mongoDatabase.listCollectionNames(); for (String collectionName : collectionNames) { + if (collectionName.startsWith("system.")) { + continue; + } if (!isSyncNeeded(collectionName)) { continue; } @@ -181,6 +184,14 @@ private static String buildConnectionString( return sb.toString(); } + private String getConnectionOptions() { + String options = config.getString("connection-options", null); + if (StringUtils.isEmpty(options)) { + options = config.get(MongoDBSourceOptions.CONNECTION_OPTIONS); + } + return options; + } + @Override public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { String hosts = config.get(MongoDBSourceOptions.HOSTS); @@ -203,6 +214,8 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { .hosts(hosts) .username(username) .password(password) + .scheme(config.get(MongoDBSourceOptions.SCHEME)) + .connectionOptions(getConnectionOptions()) .databaseList(database) .collectionList(collection);