diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ClientFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ClientFactory.java index f647057af4..3803ffeda9 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ClientFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ClientFactory.java @@ -1,6 +1,10 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugins.sink.s3; @@ -16,33 +20,23 @@ import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; -import software.amazon.awssdk.services.s3.S3Client; public final class ClientFactory { private ClientFactory() { } - static S3Client createS3Client(final S3SinkConfig s3SinkConfig, final AwsCredentialsSupplier awsCredentialsSupplier) { - final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(s3SinkConfig.getAwsAuthenticationOptions()); - final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions); - - return S3Client.builder() - .region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion()) - .credentialsProvider(awsCredentialsProvider) - .overrideConfiguration(createOverrideConfiguration(s3SinkConfig)).build(); - } - static S3AsyncClient createS3AsyncClient(final S3SinkConfig s3SinkConfig, final AwsCredentialsSupplier awsCredentialsSupplier) { final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(s3SinkConfig.getAwsAuthenticationOptions()); final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions); - S3AsyncClientBuilder s3AsyncClientBuilder = S3AsyncClient.builder() + final S3AsyncClientBuilder s3AsyncClientBuilder = S3AsyncClient.builder() .region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion()) + .crossRegionAccessEnabled(true) .credentialsProvider(awsCredentialsProvider) .overrideConfiguration(createOverrideConfiguration(s3SinkConfig)); if (s3SinkConfig.getClientOptions() != null) { final ClientOptions clientOptions = s3SinkConfig.getClientOptions(); - SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() + final SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .connectionAcquisitionTimeout(clientOptions.getAcquireTimeout()) .maxConcurrency(clientOptions.getMaxConnections()) .build(); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ClientFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ClientFactoryTest.java index 947bc728e9..b334be8e6d 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ClientFactoryTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ClientFactoryTest.java @@ -1,6 +1,10 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugins.sink.s3; @@ -25,8 +29,6 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.S3ClientBuilder; import java.time.Duration; import java.util.Map; @@ -62,14 +64,14 @@ void setUp() { @Test void createS3AsyncClient_with_real_S3AsyncClient() { when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); - final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier); + final S3AsyncClient s3Client = ClientFactory.createS3AsyncClient(s3SinkConfig, awsCredentialsSupplier); assertThat(s3Client, notNullValue()); } @ParameterizedTest @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"}) - void createS3Client_provides_correct_inputs(final String regionString) { + void createS3AsyncClient_with_client_options_returns_expected_client(final String regionString) { final Region region = Region.of(regionString); final String stsRoleArn = UUID.randomUUID().toString(); final String externalId = UUID.randomUUID().toString(); @@ -82,49 +84,9 @@ void createS3Client_provides_correct_inputs(final String regionString) { final AwsCredentialsProvider expectedCredentialsProvider = mock(AwsCredentialsProvider.class); when(awsCredentialsSupplier.getProvider(any())).thenReturn(expectedCredentialsProvider); - final S3ClientBuilder s3ClientBuilder = mock(S3ClientBuilder.class); - when(s3ClientBuilder.region(region)).thenReturn(s3ClientBuilder); - when(s3ClientBuilder.credentialsProvider(any())).thenReturn(s3ClientBuilder); - when(s3ClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(s3ClientBuilder); - try(final MockedStatic s3ClientMockedStatic = mockStatic(S3Client.class)) { - s3ClientMockedStatic.when(S3Client::builder) - .thenReturn(s3ClientBuilder); - ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier); - } - - final ArgumentCaptor credentialsProviderArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsProvider.class); - verify(s3ClientBuilder).credentialsProvider(credentialsProviderArgumentCaptor.capture()); - - final AwsCredentialsProvider actualCredentialsProvider = credentialsProviderArgumentCaptor.getValue(); - - assertThat(actualCredentialsProvider, equalTo(expectedCredentialsProvider)); - - final ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class); - verify(awsCredentialsSupplier).getProvider(optionsArgumentCaptor.capture()); - - final AwsCredentialsOptions actualCredentialsOptions = optionsArgumentCaptor.getValue(); - assertThat(actualCredentialsOptions.getRegion(), equalTo(region)); - assertThat(actualCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn)); - assertThat(actualCredentialsOptions.getStsExternalId(), equalTo(externalId)); - assertThat(actualCredentialsOptions.getStsHeaderOverrides(), equalTo(stsHeaderOverrides)); - } - - @Test - void createS3AsyncClient_with_client_options_returns_expected_client() { - final Region region = Region.of("us-east-1"); - final String stsRoleArn = UUID.randomUUID().toString(); - final String externalId = UUID.randomUUID().toString(); - final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); - when(awsAuthenticationOptions.getAwsRegion()).thenReturn(region); - when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn); - when(awsAuthenticationOptions.getAwsStsExternalId()).thenReturn(externalId); - when(awsAuthenticationOptions.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides); - - final AwsCredentialsProvider expectedCredentialsProvider = mock(AwsCredentialsProvider.class); - when(awsCredentialsSupplier.getProvider(any())).thenReturn(expectedCredentialsProvider); - final S3AsyncClientBuilder s3AsyncClientBuilder = mock(S3AsyncClientBuilder.class); when(s3AsyncClientBuilder.region(region)).thenReturn(s3AsyncClientBuilder); + when(s3AsyncClientBuilder.crossRegionAccessEnabled(true)).thenReturn(s3AsyncClientBuilder); when(s3AsyncClientBuilder.credentialsProvider(any())).thenReturn(s3AsyncClientBuilder); when(s3AsyncClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(s3AsyncClientBuilder);