-
Notifications
You must be signed in to change notification settings - Fork 331
Enable cross-region writes in the S3 sink. #6323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,10 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.sink.s3; | ||
|
|
@@ -16,33 +20,23 @@ | |
| import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; | ||
| import software.amazon.awssdk.services.s3.S3AsyncClient; | ||
| import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; | ||
| import software.amazon.awssdk.services.s3.S3Client; | ||
|
|
||
| public final class ClientFactory { | ||
| private ClientFactory() { } | ||
|
|
||
| static S3Client createS3Client(final S3SinkConfig s3SinkConfig, final AwsCredentialsSupplier awsCredentialsSupplier) { | ||
| final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(s3SinkConfig.getAwsAuthenticationOptions()); | ||
| final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions); | ||
|
|
||
| return S3Client.builder() | ||
| .region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion()) | ||
| .credentialsProvider(awsCredentialsProvider) | ||
| .overrideConfiguration(createOverrideConfiguration(s3SinkConfig)).build(); | ||
| } | ||
|
|
||
| static S3AsyncClient createS3AsyncClient(final S3SinkConfig s3SinkConfig, final AwsCredentialsSupplier awsCredentialsSupplier) { | ||
| final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(s3SinkConfig.getAwsAuthenticationOptions()); | ||
| final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions); | ||
|
|
||
| S3AsyncClientBuilder s3AsyncClientBuilder = S3AsyncClient.builder() | ||
| final S3AsyncClientBuilder s3AsyncClientBuilder = S3AsyncClient.builder() | ||
| .region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion()) | ||
| .crossRegionAccessEnabled(true) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this intended behavior to choose the correct region for the bucket even though customer has configured with wrong region ? When crossRegionAccessEnabled is set, the SDK automatically redirects requests to the correct bucket region. If a bucket doesn't exist in the configured region, the SDK uses the error response to identify the actual region and retries the request there.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is the intended behavior. And we have started doing this in the sink in #6083. The region that the pipeline author configured is the first region attempted. If the bucket is not in that region and the IAM role has permissions for Because the S3 sink supports dynamic bucket names, you might have a configuration that looks like this: This won't work with a single region. |
||
| .credentialsProvider(awsCredentialsProvider) | ||
| .overrideConfiguration(createOverrideConfiguration(s3SinkConfig)); | ||
|
|
||
| if (s3SinkConfig.getClientOptions() != null) { | ||
| final ClientOptions clientOptions = s3SinkConfig.getClientOptions(); | ||
| SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() | ||
| final SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() | ||
| .connectionAcquisitionTimeout(clientOptions.getAcquireTimeout()) | ||
| .maxConcurrency(clientOptions.getMaxConnections()) | ||
| .build(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one should be bucket's region - right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the region that is configured in the pipeline configuration. It should be the bucket's region, or a preferred region if the buckets are all scattered when using pipeline configurations. I suppose if you were running in us-west-2 and writing to buckets in us-east-1 and us-west-2, you'd probably want to start with the first.