Skip to content

Commit d4b8363

Browse files
authored
S3 sink server-side encryption with KMS (#6655)
S3 sink server-side encryption with KMS Adds new configuration for encryption options in the S3 sink. Allow configuring a custom KMS key for S3 server-side encryption. Support SSE-KMS and DSSE-KMS. Supports multi-part and locally buffered options. Resolves #6528. Signed-off-by: David Venable <dlv@amazon.com>
1 parent b73188a commit d4b8363

27 files changed

Lines changed: 709 additions & 67 deletions

File tree

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/BufferFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.plugins.accumulator;

data-prepper-plugins/s3-sink/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
dependencies {
@@ -77,6 +81,7 @@ task integrationTest(type: Test) {
7781
systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'
7882
systemProperty 'tests.s3sink.bucket', System.getProperty('tests.s3sink.bucket')
7983
systemProperty 'tests.s3sink.region', System.getProperty('tests.s3sink.region')
84+
systemProperty 'tests.s3sink.kms_key', System.getProperty('tests.s3sink.kms_key')
8085

8186
filter {
8287
includeTestsMatching '*IT'

data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.plugins.sink.s3;
@@ -36,6 +40,8 @@
3640
import org.opensearch.dataprepper.plugins.sink.s3.configuration.AggregateThresholdOptions;
3741
import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions;
3842
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions;
43+
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ServerSideEncryptionConfig;
44+
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ServerSideEncryptionType;
3945
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions;
4046
import org.slf4j.Logger;
4147
import org.slf4j.LoggerFactory;
@@ -44,14 +50,18 @@
4450
import software.amazon.awssdk.regions.Region;
4551
import software.amazon.awssdk.services.s3.S3Client;
4652
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
53+
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
54+
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
4755
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
4856
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
4957
import software.amazon.awssdk.services.s3.model.S3Object;
58+
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
5059

5160
import java.io.File;
5261
import java.io.FileInputStream;
5362
import java.io.FileOutputStream;
5463
import java.io.IOException;
64+
import java.lang.reflect.Field;
5565
import java.time.Duration;
5666
import java.time.LocalDate;
5767
import java.time.LocalDateTime;
@@ -424,6 +434,57 @@ void testWithDynamicGroupsAndAggregateThreshold() throws IOException {
424434
}
425435
}
426436

437+
@ParameterizedTest
438+
@ArgumentsSource(EncryptionArguments.class)
439+
void test_server_side_encryption(final BufferTypeOptions bufferTypeOptions,
440+
final ServerSideEncryptionConfig encryptionConfig,
441+
final ServerSideEncryption expectedEncryption,
442+
final String expectedKmsKeyId) {
443+
final OutputScenario outputScenario = new NdjsonOutputScenario();
444+
final String testRun = "encryption-" + bufferTypeOptions + "-" + expectedEncryption + "-" + UUID.randomUUID();
445+
final String pathPrefix = pathPrefixForTestSuite + testRun;
446+
when(objectKeyOptions.getPathPrefix()).thenReturn(pathPrefix + "/");
447+
448+
when(pluginFactory.loadPlugin(eq(OutputCodec.class), any())).thenReturn(outputScenario.getCodec());
449+
when(s3SinkConfig.getBufferType()).thenReturn(bufferTypeOptions);
450+
when(s3SinkConfig.getCompression()).thenReturn(new NoneCompressionScenario().getCompressionOption());
451+
when(thresholdOptions.getEventCount()).thenReturn(1);
452+
453+
when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(anyString()))
454+
.thenReturn(Collections.emptyList());
455+
when(expressionEvaluator.extractDynamicKeysFromFormatExpression(anyString()))
456+
.thenReturn(Collections.emptyList());
457+
458+
when(s3SinkConfig.getServerSideEncryptionConfig()).thenReturn(encryptionConfig);
459+
460+
final S3Sink objectUnderTest = createObjectUnderTest();
461+
462+
final List<Record<Event>> events = List.of(new Record<>(generateTestEvent(generateEventData(1))));
463+
objectUnderTest.doOutput(events);
464+
465+
final ListObjectsV2Response listObjectsResponse = s3Client.listObjectsV2(ListObjectsV2Request.builder()
466+
.bucket(bucketName)
467+
.prefix(pathPrefix + "/")
468+
.build());
469+
470+
assertThat(listObjectsResponse.contents().size(), equalTo(1));
471+
472+
final String objectKey = listObjectsResponse.contents().get(0).key();
473+
final HeadObjectResponse headResponse = s3Client.headObject(HeadObjectRequest.builder()
474+
.bucket(bucketName)
475+
.key(objectKey)
476+
.build());
477+
478+
assertThat(headResponse.serverSideEncryption(), equalTo(expectedEncryption));
479+
assertThat(headResponse.ssekmsKeyId(), equalTo(expectedKmsKeyId));
480+
}
481+
482+
private static void setField(final Object object, final String fieldName, final Object value) throws Exception {
483+
final Field field = object.getClass().getDeclaredField(fieldName);
484+
field.setAccessible(true);
485+
field.set(object, value);
486+
}
487+
427488
private File decompressFileIfNecessary(OutputScenario outputScenario, CompressionScenario compressionScenario, String pathPrefix, File target) throws IOException {
428489

429490
if (outputScenario.isCompressionInternal() || !compressionScenario.requiresDecompression())
@@ -552,6 +613,33 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext extensionCo
552613
}
553614
}
554615

616+
static class EncryptionArguments implements ArgumentsProvider {
617+
@Override
618+
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) throws Exception {
619+
final String kmsKeyId = System.getProperty("tests.s3sink.kms_key");
620+
621+
final ServerSideEncryptionConfig s3Config = new ServerSideEncryptionConfig();
622+
setField(s3Config, "type", ServerSideEncryptionType.S3);
623+
624+
final ServerSideEncryptionConfig kmsConfig = new ServerSideEncryptionConfig();
625+
setField(kmsConfig, "type", ServerSideEncryptionType.KMS);
626+
setField(kmsConfig, "kmsKeyId", kmsKeyId);
627+
628+
final ServerSideEncryptionConfig dsseConfig = new ServerSideEncryptionConfig();
629+
setField(dsseConfig, "type", ServerSideEncryptionType.KMS_DSSE);
630+
setField(dsseConfig, "kmsKeyId", kmsKeyId);
631+
632+
final List<BufferTypeOptions> bufferTypes = List.of(BufferTypeOptions.INMEMORY, BufferTypeOptions.MULTI_PART);
633+
634+
return bufferTypes.stream().flatMap(bufferType -> Stream.of(
635+
arguments(bufferType, null, ServerSideEncryption.AES256, null),
636+
arguments(bufferType, s3Config, ServerSideEncryption.AES256, null),
637+
arguments(bufferType, kmsConfig, ServerSideEncryption.AWS_KMS, kmsKeyId),
638+
arguments(bufferType, dsseConfig, ServerSideEncryption.AWS_KMS_DSSE, kmsKeyId)
639+
));
640+
}
641+
}
642+
555643
private static Stream<? extends Arguments> generateCombinedArguments(
556644
final List<BufferScenario> bufferScenarios,
557645
final List<OutputScenario> outputScenarios,

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59
package org.opensearch.dataprepper.plugins.codec.parquet;
610

711

812
import org.apache.parquet.io.PositionOutputStream;
13+
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ServerSideEncryptionConfig;
914
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
1015
import org.slf4j.Logger;
1116
import org.slf4j.LoggerFactory;
@@ -87,6 +92,8 @@ public class S3OutputStream extends PositionOutputStream {
8792

8893
private final ExecutorService executorService;
8994

95+
private final ServerSideEncryptionConfig serverSideEncryptionConfig;
96+
9097
/**
9198
* Creates a new S3 OutputStream
9299
*
@@ -95,12 +102,14 @@ public class S3OutputStream extends PositionOutputStream {
95102
* @param keySupplier path within the bucket
96103
* @param defaultBucket default bucket
97104
* @param bucketOwnerProvider bucket owner provider
105+
* @param serverSideEncryptionConfig server-side encryption config
98106
*/
99107
public S3OutputStream(final S3AsyncClient s3Client,
100108
final Supplier<String> bucketSupplier,
101109
final Supplier<String> keySupplier,
102110
final String defaultBucket,
103-
final BucketOwnerProvider bucketOwnerProvider) {
111+
final BucketOwnerProvider bucketOwnerProvider,
112+
final ServerSideEncryptionConfig serverSideEncryptionConfig) {
104113
this.s3Client = s3Client;
105114
this.bucket = bucketSupplier.get();
106115
this.key = keySupplier.get();
@@ -111,6 +120,15 @@ public S3OutputStream(final S3AsyncClient s3Client,
111120
this.defaultBucket = defaultBucket;
112121
this.executorService = Executors.newSingleThreadExecutor();
113122
this.bucketOwnerProvider = bucketOwnerProvider;
123+
this.serverSideEncryptionConfig = serverSideEncryptionConfig;
124+
}
125+
126+
public S3OutputStream(final S3AsyncClient s3Client,
127+
final Supplier<String> bucketSupplier,
128+
final Supplier<String> keySupplier,
129+
final String defaultBucket,
130+
final BucketOwnerProvider bucketOwnerProvider) {
131+
this(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider, null);
114132
}
115133

116134
@Override
@@ -285,12 +303,14 @@ public long getPos() throws IOException {
285303
}
286304

287305
private void createMultipartUpload() {
288-
CreateMultipartUploadRequest uploadRequest = CreateMultipartUploadRequest.builder()
306+
CreateMultipartUploadRequest.Builder builder = CreateMultipartUploadRequest.builder()
289307
.bucket(bucket)
290308
.key(key)
291-
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(bucket).orElse(null))
292-
.build();
293-
CompletableFuture<CreateMultipartUploadResponse> multipartUpload = s3Client.createMultipartUpload(uploadRequest);
309+
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(bucket).orElse(null));
310+
if (serverSideEncryptionConfig != null) {
311+
serverSideEncryptionConfig.applyTo(builder);
312+
}
313+
CompletableFuture<CreateMultipartUploadResponse> multipartUpload = s3Client.createMultipartUpload(builder.build());
294314

295315
final CreateMultipartUploadResponse response = multipartUpload.join();
296316

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.plugins.sink.s3;
@@ -18,6 +22,7 @@
1822
import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions;
1923
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ClientOptions;
2024
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions;
25+
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ServerSideEncryptionConfig;
2126
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions;
2227

2328
import java.util.Map;
@@ -108,6 +113,10 @@ private boolean isValidBucketConfig() {
108113
@JsonProperty("client")
109114
private ClientOptions clientOptions;
110115

116+
@JsonProperty("server_side_encryption")
117+
@Valid
118+
private ServerSideEncryptionConfig serverSideEncryptionConfig;
119+
111120
/**
112121
* Aws Authentication configuration Options.
113122
* @return aws authentication options.
@@ -213,4 +222,8 @@ public String getDefaultBucketOwner() {
213222
public ClientOptions getClientOptions() {
214223
return clientOptions;
215224
}
225+
226+
public ServerSideEncryptionConfig getServerSideEncryptionConfig() {
227+
return serverSideEncryptionConfig;
228+
}
216229
}
Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.plugins.sink.s3.accumulator;
711

12+
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ServerSideEncryptionConfig;
813
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
914
import software.amazon.awssdk.services.s3.S3AsyncClient;
1015

@@ -13,5 +18,11 @@
1318
import java.util.function.Function;
1419

1520
public interface BufferFactory {
16-
Buffer getBuffer(S3AsyncClient s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier, String defaultBucket, Function<Integer, Map<String, String>> metadataSupplier, BucketOwnerProvider bucketOwnerProvider);
21+
Buffer getBuffer(S3AsyncClient s3Client,
22+
Supplier<String> bucketSupplier,
23+
Supplier<String> keySupplier,
24+
String defaultBucket,
25+
Function<Integer, Map<String, String>> metadataSupplier,
26+
BucketOwnerProvider bucketOwnerProvider,
27+
ServerSideEncryptionConfig serverSideEncryptionConfig);
1728
}

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.plugins.sink.s3.accumulator;
711

12+
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ServerSideEncryptionConfig;
813
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
914
import org.slf4j.Logger;
1015
import org.slf4j.LoggerFactory;
@@ -34,17 +39,12 @@ static CompletableFuture<PutObjectResponse> putObjectOrSendToDefaultBucket(final
3439
final String targetBucket,
3540
final String defaultBucket,
3641
final Map<String, String> objectMetadata,
37-
final BucketOwnerProvider bucketOwnerProvider) {
42+
final BucketOwnerProvider bucketOwnerProvider,
43+
final ServerSideEncryptionConfig serverSideEncryptionConfig) {
3844

3945
final boolean[] defaultBucketAttempted = new boolean[1];
40-
PutObjectRequest.Builder builder = PutObjectRequest.builder()
41-
.bucket(targetBucket)
42-
.key(objectKey)
43-
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(targetBucket).orElse(null));
44-
if (objectMetadata != null) {
45-
builder = builder.metadata(objectMetadata);
46-
}
47-
return s3Client.putObject(builder.build(), requestBody)
46+
final PutObjectRequest putObjectRequest = buildPutObjectRequest(targetBucket, objectKey, objectMetadata, bucketOwnerProvider, serverSideEncryptionConfig);
47+
return s3Client.putObject(putObjectRequest, requestBody)
4848
.handle((result, ex) -> {
4949
if (ex != null) {
5050
runOnFailure.accept(ex);
@@ -53,13 +53,8 @@ static CompletableFuture<PutObjectResponse> putObjectOrSendToDefaultBucket(final
5353
(ex instanceof NoSuchBucketException || ex.getCause() instanceof NoSuchBucketException || ex.getMessage().contains(ACCESS_DENIED) || ex.getMessage().contains(INVALID_BUCKET))) {
5454
LOG.warn("Bucket {} could not be accessed, attempting to send to default_bucket {}", targetBucket, defaultBucket);
5555
defaultBucketAttempted[0] = true;
56-
return s3Client.putObject(
57-
PutObjectRequest.builder()
58-
.bucket(defaultBucket)
59-
.key(objectKey)
60-
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(defaultBucket).orElse(null))
61-
.build(),
62-
requestBody);
56+
final PutObjectRequest defaultPutObjectRequest = buildPutObjectRequest(defaultBucket, objectKey, null, bucketOwnerProvider, serverSideEncryptionConfig);
57+
return s3Client.putObject(defaultPutObjectRequest, requestBody);
6358
} else {
6459
runOnCompletion.accept(false);
6560
return CompletableFuture.completedFuture(result);
@@ -80,4 +75,22 @@ static CompletableFuture<PutObjectResponse> putObjectOrSendToDefaultBucket(final
8075
}
8176
});
8277
}
78+
79+
private static PutObjectRequest buildPutObjectRequest(final String bucket,
80+
final String objectKey,
81+
final Map<String, String> objectMetadata,
82+
final BucketOwnerProvider bucketOwnerProvider,
83+
final ServerSideEncryptionConfig serverSideEncryptionConfig) {
84+
final PutObjectRequest.Builder builder = PutObjectRequest.builder()
85+
.bucket(bucket)
86+
.key(objectKey)
87+
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(bucket).orElse(null));
88+
if (objectMetadata != null) {
89+
builder.metadata(objectMetadata);
90+
}
91+
if (serverSideEncryptionConfig != null) {
92+
serverSideEncryptionConfig.applyTo(builder);
93+
}
94+
return builder.build();
95+
}
8396
}

0 commit comments

Comments
 (0)