From 6419298755356bc86f248cf38fbbb9857df32305 Mon Sep 17 00:00:00 2001 From: Daniel McKay Date: Wed, 21 Jan 2026 18:35:49 +0000 Subject: [PATCH 1/2] Add support for IAM authentication in connection string for DocumentDB source Signed-off-by: Daniel McKay --- data-prepper-plugins/mongodb/build.gradle | 1 + .../mongo/client/MongoDBConnection.java | 57 +++++++++---- .../configuration/MongoDBSourceConfig.java | 15 ++++ .../mongo/documentdb/DocumentDBSource.java | 2 + .../mongo/client/MongoDBConnectionTest.java | 28 ++++-- .../MongoDBSourceConfigTest.java | 85 +++++++++++++++++++ 6 files changed, 164 insertions(+), 24 deletions(-) create mode 100644 data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfigTest.java diff --git a/data-prepper-plugins/mongodb/build.gradle b/data-prepper-plugins/mongodb/build.gradle index 044d7b017c..8ef869eb47 100644 --- a/data-prepper-plugins/mongodb/build.gradle +++ b/data-prepper-plugins/mongodb/build.gradle @@ -8,6 +8,7 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-core' implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' implementation 'software.amazon.awssdk:s3' implementation project(path: ':data-prepper-plugins:aws-plugin-api') diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnection.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnection.java index 224d8b0f54..6995473d67 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnection.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnection.java @@ -2,6 +2,7 @@ import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; +import com.mongodb.MongoCredential; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig; @@ -13,14 +14,23 @@ import java.util.Objects; public class MongoDBConnection { - private static final String MONGO_CONNECTION_STRING_TEMPLATE = "mongodb://%s:%s@%s:%s/?replicaSet=rs0&readpreference=%s&ssl=%s&tlsAllowInvalidHostnames=%s&directConnection=%s"; + private static final String IAM_AUTH_SOURCE = "$external"; + private static final String IAM_AUTH_MECHANISM = "MONGODB-AWS"; + private static final String MONGO_PASSWORD_CONNECTION_STRING_TEMPLATE = "mongodb://%s:%s@%s:%s/?replicaSet=rs0&readpreference=%s&ssl=%s&tlsAllowInvalidHostnames=%s&directConnection=%s"; + private static final String MONGO_IAM_CONNECTION_STRING_TEMPLATE = "mongodb://%s:%s/?replicaSet=rs0&readpreference=%s&ssl=%s&tlsAllowInvalidHostnames=%s&directConnection=%s&authSource=%s&authMechanism=%s"; public static MongoClient getMongoClient(final MongoDBSourceConfig sourceConfig) { - final String connectionString = getConnectionString(sourceConfig); + final boolean usesIAMAuthentication = usesIAMAuthentication(sourceConfig); + final String connectionString = getConnectionString(sourceConfig, usesIAMAuthentication); final MongoClientSettings.Builder settingBuilder = MongoClientSettings.builder() .applyConnectionString(new ConnectionString(connectionString)); + if (usesIAMAuthentication) { + // Create an empty credential. This triggers mongo to use the underlying IAM role. + final MongoCredential credential = MongoCredential.createAwsCredential(null, null); + settingBuilder.credential(credential); + } if (Objects.nonNull(sourceConfig.getTrustStoreFilePath())) { final File truststoreFilePath = new File(sourceConfig.getTrustStoreFilePath()); @@ -39,7 +49,23 @@ private static String encodeString(final String input) { return URLEncoder.encode(input, StandardCharsets.UTF_8); } - private static String getConnectionString(final MongoDBSourceConfig sourceConfig) { + private static String getConnectionString(final MongoDBSourceConfig sourceConfig, final boolean usesIamAuth) { + // Support for only single host + final String hostname = sourceConfig.getHost(); + final int port = sourceConfig.getPort(); + final String tls = sourceConfig.getTls().toString(); + final String invalidHostAllowed = sourceConfig.getSslInsecureDisableVerification().toString(); + final String readPreference = sourceConfig.getReadPreference(); + final String directionConnection = sourceConfig.getDirectConnection().toString(); + + if (sourceConfig.getHost() == null || sourceConfig.getHost().isBlank()) { + throw new RuntimeException("The host should not be null or empty."); + } + + if (usesIamAuth) { + return String.format(MONGO_IAM_CONNECTION_STRING_TEMPLATE, hostname, port, readPreference, tls, invalidHostAllowed, directionConnection, encodeString(IAM_AUTH_SOURCE), encodeString(IAM_AUTH_MECHANISM)); + } + final String username; try { username = encodeString(sourceConfig.getAuthenticationConfig().getUsername()); @@ -54,18 +80,19 @@ private static String getConnectionString(final MongoDBSourceConfig sourceConfig throw new RuntimeException("Unsupported characters in password."); } - if (sourceConfig.getHost() == null || sourceConfig.getHost().isBlank()) { - throw new RuntimeException("The host should not be null or empty."); - } + return String.format(MONGO_PASSWORD_CONNECTION_STRING_TEMPLATE, username, password, hostname, port, readPreference, tls, invalidHostAllowed, directionConnection); + } - // Support for only single host - final String hostname = sourceConfig.getHost(); - final int port = sourceConfig.getPort(); - final String tls = sourceConfig.getTls().toString(); - final String invalidHostAllowed = sourceConfig.getSslInsecureDisableVerification().toString(); - final String readPreference = sourceConfig.getReadPreference(); - final String directionConnection = sourceConfig.getDirectConnection().toString(); - return String.format(MONGO_CONNECTION_STRING_TEMPLATE, username, password, hostname, port, - readPreference, tls, invalidHostAllowed, directionConnection); + private static boolean usesIAMAuthentication(final MongoDBSourceConfig sourceConfig) { + final boolean hasUsernamePassword = Objects.nonNull(sourceConfig.getAuthenticationConfig()) && + (Objects.nonNull(sourceConfig.getAuthenticationConfig().getUsername()) || + Objects.nonNull(sourceConfig.getAuthenticationConfig().getPassword())); + + if (hasUsernamePassword) { + return false; + } + + return Objects.nonNull(sourceConfig.getAwsConfig()) && + Objects.nonNull(sourceConfig.getAwsConfig().getAwsStsRoleArn()); } } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java index 523413da06..8b497c8fec 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java @@ -8,6 +8,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Objects; public class MongoDBSourceConfig { private static final int DEFAULT_PORT = 27017; @@ -163,4 +164,18 @@ public String getPassword() { } } + + public void validateAwsConfigWithUsernameAndPassword() { + final boolean hasUsernamePassword = Objects.nonNull(authenticationConfig) && + (Objects.nonNull(authenticationConfig.getUsername()) || Objects.nonNull(authenticationConfig.getPassword())); + final boolean hasAwsAuth = Objects.nonNull(awsConfig) && Objects.nonNull(awsConfig.getAwsStsRoleArn()); + + if (hasUsernamePassword && hasAwsAuth) { + throw new IllegalArgumentException("Either username and password, or aws sts_role_arn must be specified. Both cannot be set at once."); + } + + if (!hasUsernamePassword && !hasAwsAuth) { + throw new IllegalArgumentException("Either username and password, or aws sts_role_arn must be specified."); + } + } } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBSource.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBSource.java index 6a8e82a8a9..d933df8640 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBSource.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBSource.java @@ -47,6 +47,8 @@ public DocumentDBSource(final PluginMetrics pluginMetrics, this.acknowledgementSetManager = acknowledgementSetManager; this.pluginConfigObservable = pluginConfigObservable; this.acknowledgementsEnabled = sourceConfig.isAcknowledgmentsEnabled(); + + sourceConfig.validateAwsConfigWithUsernameAndPassword(); } @Override diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnectionTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnectionTest.java index 30bc721c49..d594eba3c9 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnectionTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnectionTest.java @@ -6,6 +6,7 @@ import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.mongo.configuration.AwsConfig; import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig; import org.opensearch.dataprepper.plugins.truststore.TrustStoreProvider; @@ -30,12 +31,13 @@ public class MongoDBConnectionTest { @Mock private MongoDBSourceConfig.AuthenticationConfig authenticationConfig; + @Mock + private AwsConfig awsConfig; + private final Random random = new Random(); void setUp() { when(mongoDBSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig); - when(authenticationConfig.getUsername()).thenReturn("\uD800\uD800" + UUID.randomUUID()); - when(authenticationConfig.getPassword()).thenReturn("aЯ ⾀sd?q=%%l€0£.lo" + UUID.randomUUID()); when(mongoDBSourceConfig.getHost()).thenReturn(UUID.randomUUID().toString()); when(mongoDBSourceConfig.getPort()).thenReturn(getRandomInteger()); when(mongoDBSourceConfig.getTls()).thenReturn(getRandomBoolean()); @@ -44,8 +46,10 @@ void setUp() { } @Test - public void getMongoClient() { + public void getMongoClientWithUsernamePassword() { setUp(); + when(authenticationConfig.getUsername()).thenReturn("\uD800\uD800" + UUID.randomUUID()); + when(authenticationConfig.getPassword()).thenReturn("aЯ ⾀sd?q=%%l€0£.lo" + UUID.randomUUID()); final MongoClient mongoClient = MongoDBConnection.getMongoClient(mongoDBSourceConfig); assertThat(mongoClient, is(notNullValue())); } @@ -53,6 +57,8 @@ public void getMongoClient() { @Test public void getMongoClientWithTLS() { setUp(); + when(authenticationConfig.getUsername()).thenReturn("\uD800\uD800" + UUID.randomUUID()); + when(authenticationConfig.getPassword()).thenReturn("aЯ ⾀sd?q=%%l€0£.lo" + UUID.randomUUID()); when(mongoDBSourceConfig.getTrustStoreFilePath()).thenReturn(UUID.randomUUID().toString()); when(mongoDBSourceConfig.getTrustStorePassword()).thenReturn(UUID.randomUUID().toString()); final Path path = mock(Path.class); @@ -68,22 +74,26 @@ public void getMongoClientWithTLS() { @Test public void getMongoClientNullHost() { - when(mongoDBSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig); - when(authenticationConfig.getUsername()).thenReturn("\uD800\uD800" + UUID.randomUUID()); - when(authenticationConfig.getPassword()).thenReturn("aЯ ⾀sd?q=%%l€0£.lo" + UUID.randomUUID()); when(mongoDBSourceConfig.getHost()).thenReturn(null); assertThrows(RuntimeException.class, () -> MongoDBConnection.getMongoClient(mongoDBSourceConfig)); } @Test public void getMongoClientEmptyHost() { - when(mongoDBSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig); - when(authenticationConfig.getUsername()).thenReturn("\uD800\uD800" + UUID.randomUUID()); - when(authenticationConfig.getPassword()).thenReturn("aЯ ⾀sd?q=%%l€0£.lo" + UUID.randomUUID()); when(mongoDBSourceConfig.getHost()).thenReturn(" "); assertThrows(RuntimeException.class, () -> MongoDBConnection.getMongoClient(mongoDBSourceConfig)); } + @Test + public void getMongoClientWithIAMAuth() { + setUp(); + when(mongoDBSourceConfig.getAuthenticationConfig()).thenReturn(null); + when(mongoDBSourceConfig.getAwsConfig()).thenReturn(awsConfig); + when(awsConfig.getAwsStsRoleArn()).thenReturn("arn:aws:iam::123456789012:role/testRole"); + final MongoClient mongoClient = MongoDBConnection.getMongoClient(mongoDBSourceConfig); + assertThat(mongoClient, is(notNullValue())); + } + private Boolean getRandomBoolean() { return random.nextBoolean(); } diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfigTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfigTest.java new file mode 100644 index 0000000000..0ea6ce57f2 --- /dev/null +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfigTest.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.mongo.configuration; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class MongoDBSourceConfigTest { + + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + @Test + void username_password_only() throws JsonProcessingException { + final String configYaml = + "host: \"localhost\"\n" + + "authentication:\n" + + " username: test\n" + + " password: test\n" + + "collections:\n" + + " - collection: test\n"; + + final MongoDBSourceConfig config = objectMapper.readValue(configYaml, MongoDBSourceConfig.class); + + config.validateAwsConfigWithUsernameAndPassword(); + assertThat(config.getAuthenticationConfig(), notNullValue()); + assertThat(config.getAuthenticationConfig().getUsername(), equalTo("test")); + assertThat(config.getAuthenticationConfig().getPassword(), equalTo("test")); + assertThat(config.getAwsConfig(), nullValue()); + } + + @Test + void aws_sts_role_arn_only() throws JsonProcessingException { + final String configYaml = + "host: \"localhost\"\n" + + "aws:\n" + + " sts_role_arn: \"arn:aws:iam::123456789012:role/test-role\"\n" + + "collections:\n" + + " - collection: test\n"; + + final MongoDBSourceConfig config = objectMapper.readValue(configYaml, MongoDBSourceConfig.class); + + config.validateAwsConfigWithUsernameAndPassword(); + assertThat(config.getAwsConfig(), notNullValue()); + assertThat(config.getAwsConfig().getAwsStsRoleArn(), equalTo("arn:aws:iam::123456789012:role/test-role")); + assertThat(config.getAuthenticationConfig(), nullValue()); + } + + @Test + void both_username_password_and_aws_is_invalid() throws JsonProcessingException { + final String configYaml = + "host: \"localhost\"\n" + + "authentication:\n" + + " username: test\n" + + " password: test\n" + + "aws:\n" + + " sts_role_arn: \"arn:aws:iam::123456789012:role/test-role\"\n" + + "collections:\n" + + " - collection: test\n"; + + final MongoDBSourceConfig config = objectMapper.readValue(configYaml, MongoDBSourceConfig.class); + assertThrows(IllegalArgumentException.class, config::validateAwsConfigWithUsernameAndPassword); + } + + @Test + void neither_username_password_nor_aws_is_invalid() throws JsonProcessingException { + final String configYaml = + "host: \"localhost\"\n" + + "collections:\n" + + " - collection: test\n"; + + final MongoDBSourceConfig config = objectMapper.readValue(configYaml, MongoDBSourceConfig.class); + assertThrows(IllegalArgumentException.class, config::validateAwsConfigWithUsernameAndPassword); + } +} From ce47e2981797c7e72f0dde6f2234194b1aa29628 Mon Sep 17 00:00:00 2001 From: Daniel McKay Date: Mon, 16 Feb 2026 19:13:53 +0000 Subject: [PATCH 2/2] update license header on MongoDBSourceConfigTest Signed-off-by: Daniel McKay --- .../plugins/mongo/configuration/MongoDBSourceConfigTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfigTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfigTest.java index 0ea6ce57f2..6912298667 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfigTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfigTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.mongo.configuration;