Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.sink.s3;
Expand All @@ -16,33 +20,23 @@
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;

public final class ClientFactory {
private ClientFactory() { }

static S3Client createS3Client(final S3SinkConfig s3SinkConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(s3SinkConfig.getAwsAuthenticationOptions());
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions);

return S3Client.builder()
.region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion())
.credentialsProvider(awsCredentialsProvider)
.overrideConfiguration(createOverrideConfiguration(s3SinkConfig)).build();
}

static S3AsyncClient createS3AsyncClient(final S3SinkConfig s3SinkConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(s3SinkConfig.getAwsAuthenticationOptions());
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions);

S3AsyncClientBuilder s3AsyncClientBuilder = S3AsyncClient.builder()
final S3AsyncClientBuilder s3AsyncClientBuilder = S3AsyncClient.builder()
.region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion())

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one should be bucket's region - right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the region that is configured in the pipeline configuration. It should be the bucket's region, or a preferred region if the buckets are all scattered when using pipeline configurations. I suppose if you were running in us-west-2 and writing to buckets in us-east-1 and us-west-2, you'd probably want to start with the first.

.crossRegionAccessEnabled(true)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended behavior to choose the correct region for the bucket even though customer has configured with wrong region ?

When crossRegionAccessEnabled is set, the SDK automatically redirects requests to the correct bucket region. If a bucket doesn't exist in the configured region, the SDK uses the error response to identify the actual region and retries the request there.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the intended behavior. And we have started doing this in the sink in #6083.

The region that the pipeline author configured is the first region attempted. If the bucket is not in that region and the IAM role has permissions for s3:GetBucketLocation, then S3 provides the region in a redirect. Then the SDK will resign for that region and send to that regional endpoint. It is that last step that this change provides.

Because the S3 sink supports dynamic bucket names, you might have a configuration that looks like this:

bucket: mycompany-${/aws/region}

This won't work with a single region.

.credentialsProvider(awsCredentialsProvider)
.overrideConfiguration(createOverrideConfiguration(s3SinkConfig));

if (s3SinkConfig.getClientOptions() != null) {
final ClientOptions clientOptions = s3SinkConfig.getClientOptions();
SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
final SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
.connectionAcquisitionTimeout(clientOptions.getAcquireTimeout())
.maxConcurrency(clientOptions.getMaxConnections())
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.sink.s3;
Expand All @@ -25,8 +29,6 @@
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;

import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -62,14 +64,14 @@ void setUp() {
@Test
void createS3AsyncClient_with_real_S3AsyncClient() {
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1);
final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier);
final S3AsyncClient s3Client = ClientFactory.createS3AsyncClient(s3SinkConfig, awsCredentialsSupplier);

assertThat(s3Client, notNullValue());
}

@ParameterizedTest
@ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"})
void createS3Client_provides_correct_inputs(final String regionString) {
void createS3AsyncClient_with_client_options_returns_expected_client(final String regionString) {
final Region region = Region.of(regionString);
final String stsRoleArn = UUID.randomUUID().toString();
final String externalId = UUID.randomUUID().toString();
Expand All @@ -82,49 +84,9 @@ void createS3Client_provides_correct_inputs(final String regionString) {
final AwsCredentialsProvider expectedCredentialsProvider = mock(AwsCredentialsProvider.class);
when(awsCredentialsSupplier.getProvider(any())).thenReturn(expectedCredentialsProvider);

final S3ClientBuilder s3ClientBuilder = mock(S3ClientBuilder.class);
when(s3ClientBuilder.region(region)).thenReturn(s3ClientBuilder);
when(s3ClientBuilder.credentialsProvider(any())).thenReturn(s3ClientBuilder);
when(s3ClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(s3ClientBuilder);
try(final MockedStatic<S3Client> s3ClientMockedStatic = mockStatic(S3Client.class)) {
s3ClientMockedStatic.when(S3Client::builder)
.thenReturn(s3ClientBuilder);
ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier);
}

final ArgumentCaptor<AwsCredentialsProvider> credentialsProviderArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsProvider.class);
verify(s3ClientBuilder).credentialsProvider(credentialsProviderArgumentCaptor.capture());

final AwsCredentialsProvider actualCredentialsProvider = credentialsProviderArgumentCaptor.getValue();

assertThat(actualCredentialsProvider, equalTo(expectedCredentialsProvider));

final ArgumentCaptor<AwsCredentialsOptions> optionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class);
verify(awsCredentialsSupplier).getProvider(optionsArgumentCaptor.capture());

final AwsCredentialsOptions actualCredentialsOptions = optionsArgumentCaptor.getValue();
assertThat(actualCredentialsOptions.getRegion(), equalTo(region));
assertThat(actualCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn));
assertThat(actualCredentialsOptions.getStsExternalId(), equalTo(externalId));
assertThat(actualCredentialsOptions.getStsHeaderOverrides(), equalTo(stsHeaderOverrides));
}

@Test
void createS3AsyncClient_with_client_options_returns_expected_client() {
final Region region = Region.of("us-east-1");
final String stsRoleArn = UUID.randomUUID().toString();
final String externalId = UUID.randomUUID().toString();
final Map<String, String> stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(region);
when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn);
when(awsAuthenticationOptions.getAwsStsExternalId()).thenReturn(externalId);
when(awsAuthenticationOptions.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides);

final AwsCredentialsProvider expectedCredentialsProvider = mock(AwsCredentialsProvider.class);
when(awsCredentialsSupplier.getProvider(any())).thenReturn(expectedCredentialsProvider);

final S3AsyncClientBuilder s3AsyncClientBuilder = mock(S3AsyncClientBuilder.class);
when(s3AsyncClientBuilder.region(region)).thenReturn(s3AsyncClientBuilder);
when(s3AsyncClientBuilder.crossRegionAccessEnabled(true)).thenReturn(s3AsyncClientBuilder);
when(s3AsyncClientBuilder.credentialsProvider(any())).thenReturn(s3AsyncClientBuilder);
when(s3AsyncClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(s3AsyncClientBuilder);

Expand Down
Loading