diff --git a/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java b/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java index 8a7e2a97c9..2c747b91be 100644 --- a/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java +++ b/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java @@ -12,10 +12,14 @@ import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; +import software.amazon.awssdk.core.checksums.ResponseChecksumValidation; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.LegacyMd5Plugin; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; @@ -36,6 +40,8 @@ public class S3DlqWriterConfig { private static final String AWS_IAM_ROLE = "role"; private static final String AWS_IAM = "iam"; private static final String S3_PREFIX = "s3://"; + private static final boolean FORCE_PATH_STYLE = false; + private static final boolean LEGACY_MD5_CHECKSUM = false; @JsonProperty("bucket") @NotEmpty @@ -50,6 +56,12 @@ public class S3DlqWriterConfig { @Size(min = 1, message = "region cannot be empty string") private String region = DEFAULT_AWS_REGION; + @JsonProperty("force_path_style") + private boolean forcePathStyle = FORCE_PATH_STYLE; + + @JsonProperty("legacy_md5_checksum") + private boolean legacyMd5Checksum = LEGACY_MD5_CHECKSUM; + @JsonProperty("sts_role_arn") @Size(min = 20, max = 2048, message = "sts_role_arn length should be between 1 and 2048 characters") private String stsRoleArn; @@ -76,6 +88,14 @@ public String getKeyPathPrefix() { return keyPathPrefix; } + public boolean getForcePathStyle() { + return forcePathStyle; + } + + public boolean getLegacyMd5Checksum() { + return legacyMd5Checksum; + } + public Region getRegion() { return Region.of(region); } @@ -133,12 +153,20 @@ private Arn getArn() { } public S3Client getS3Client() { - return S3Client.builder() + S3ClientBuilder s3ClientBuilder = S3Client.builder() .region(this.getRegion()) + .forcePathStyle(this.getForcePathStyle()) .credentialsProvider(this.getAwsCredentialsProvider()) .overrideConfiguration(ClientOverrideConfiguration.builder() .retryPolicy(RetryPolicy.builder().numRetries(MAX_NUMBER_OF_RETRIES).build()) - .build()) - .build(); + .build()); + + if (legacyMd5Checksum == true) { + s3ClientBuilder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED) + .responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED) + .addPlugin(LegacyMd5Plugin.create()); + } + + return s3ClientBuilder.build(); } } diff --git a/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java b/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java index c57b6e3d47..dcb4021f61 100644 --- a/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java +++ b/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java @@ -35,6 +35,16 @@ public void testDefaultKeyPathPrefix() { assertThat(new S3DlqWriterConfig().getKeyPathPrefix(), is(equalTo(null))); } + @Test + public void testDefaultForcePathStyle() { + assertThat(new S3DlqWriterConfig().getForcePathStyle(), is(equalTo(false))); + } + + @Test + public void testDefaultLegacyMd5Checksum() { + assertThat(new S3DlqWriterConfig().getLegacyMd5Checksum(), is(equalTo(false))); + } + @ParameterizedTest @ValueSource(strings = {"foobar", "arn:aws:es:us-west-2:123456789012:domain/bogus-domain", "arn:aws:iam::123456789012:group/bogus-group"}) @@ -62,6 +72,24 @@ public void getS3ClientWithValidStsRoleArn(final String stsRoleArn) throws NoSuc assertThat(s3Client, is(notNullValue())); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void getS3ClientWithValidAccessStyle(final boolean forcePathStyle) throws NoSuchFieldException, IllegalAccessException { + final S3DlqWriterConfig config = new S3DlqWriterConfig(); + reflectivelySetField(config, "forcePathStyle", forcePathStyle); + final S3Client s3Client = config.getS3Client(); + assertThat(s3Client, is(notNullValue())); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void getS3ClientWithLegacyMd5Checksum(final boolean legacyMd5Checksum) throws NoSuchFieldException, IllegalAccessException { + final S3DlqWriterConfig config = new S3DlqWriterConfig(); + reflectivelySetField(config, "legacyMd5Checksum", legacyMd5Checksum); + final S3Client s3Client = config.getS3Client(); + assertThat(s3Client, is(notNullValue())); + } + @ParameterizedTest @NullSource @ValueSource(strings = {"", "arn:aws:iam::123456789012:role/some-role"}) diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java index a3b6dd4995..c3101e34f6 100644 --- a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java @@ -49,6 +49,10 @@ public class DlqPushHandler { private static final String KEY_PATH_PREFIX = "key_path_prefix"; + private static final String FORCE_PATH_STYLE = "force_path_style"; + + private static final String LEGACY_MD5_CHECKSUM = "legacy_md5_checksum"; + private String dlqFile; private String keyPathPrefix; @@ -62,12 +66,14 @@ public DlqPushHandler(final String dlqFile, final String bucket, final String stsRoleArn, final String awsRegion, + final Boolean forcePathStyle, + final Boolean legacyMd5Checksum, final String dlqPathPrefix) { if(dlqFile != null) { this.dlqFile = dlqFile; this.objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter(); }else{ - this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,dlqPathPrefix); + this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,forcePathStyle,legacyMd5Checksum,dlqPathPrefix); } } @@ -117,11 +123,15 @@ private DlqProvider getDlqProvider(final PluginFactory pluginFactory, final String bucket, final String stsRoleArn, final String awsRegion, + final Boolean forcePathStyle, + final Boolean legacyMd5Checksum, final String dlqPathPrefix) { final Map props = new HashMap<>(); props.put(BUCKET, bucket); props.put(ROLE_ARN, stsRoleArn); props.put(REGION, awsRegion); + props.put(FORCE_PATH_STYLE, forcePathStyle); + props.put(LEGACY_MD5_CHECKSUM, legacyMd5Checksum); this.keyPathPrefix = StringUtils.isEmpty(dlqPathPrefix) ? dlqPathPrefix : enforceDefaultDelimiterOnKeyPathPrefix(dlqPathPrefix); props.put(KEY_PATH_PREFIX, dlqPathPrefix); final PluginSetting dlqPluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); diff --git a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java index 5af66430d1..2667cb836e 100644 --- a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java +++ b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java @@ -39,6 +39,16 @@ class DlqPushHandlerTest { private static final String KEY_PATH_PREFIX_VALUE = "dlq/"; + private static final String LEGACY_MD5_CHECKSUM = "legacy_md5_checksum"; + + private static final boolean LEGACY_MD5_CHECKSUM_TRUE = true; + private static final boolean LEGACY_MD5_CHECKSUM_FALSE = false; + + private static final String FORCE_PATH_STYLE = "force_path_style"; + + private static final boolean FORCE_PATH_STYLE_TRUE = true; + private static final boolean FORCE_PATH_STYLE_FALSE = false; + private static final String PIPELINE_NAME = "log-pipeline"; private static final String DLQ_FILE = "local_dlq_file"; @@ -84,6 +94,90 @@ void perform_for_dlq_s3_success() throws IOException { verify(dlqWriter).write(anyList(), anyString(), anyString()); } + @Test + void perform_for_dlq_s3_success_legacymd5checksum_true() throws IOException { + Map props = new HashMap<>(); + props.put(BUCKET,BUCKET_VALUE); + props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE); + props.put(LEGACY_MD5_CHECKSUM,LEGACY_MD5_CHECKSUM_TRUE); + + when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider); + + when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter)); + doNothing().when(dlqWriter).write(anyList(), anyString(), anyString()); + SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0); + dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,LEGACY_MD5_CHECKSUM_TRUE); + + PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + pluginSetting.setPipelineName(PIPELINE_NAME); + dlqPushHandler.perform(pluginSetting, failedDlqData); + Assertions.assertNotNull(pluginFactory); + verify(dlqWriter).write(anyList(), anyString(), anyString()); + } + + @Test + void perform_for_dlq_s3_success_legacymd5checksum_false() throws IOException { + Map props = new HashMap<>(); + props.put(BUCKET,BUCKET_VALUE); + props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE); + props.put(LEGACY_MD5_CHECKSUM,LEGACY_MD5_CHECKSUM_FALSE); + + when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider); + + when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter)); + doNothing().when(dlqWriter).write(anyList(), anyString(), anyString()); + SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0); + dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,LEGACY_MD5_CHECKSUM_FALSE); + + PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + pluginSetting.setPipelineName(PIPELINE_NAME); + dlqPushHandler.perform(pluginSetting, failedDlqData); + Assertions.assertNotNull(pluginFactory); + verify(dlqWriter).write(anyList(), anyString(), anyString()); + } + + @Test + void perform_for_dlq_s3_success_forcepathstyle_true() throws IOException { + Map props = new HashMap<>(); + props.put(BUCKET,BUCKET_VALUE); + props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE); + props.put(FORCE_PATH_STYLE,FORCE_PATH_STYLE_TRUE); + + when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider); + + when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter)); + doNothing().when(dlqWriter).write(anyList(), anyString(), anyString()); + SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0); + dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,FORCE_PATH_STYLE_TRUE); + + PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + pluginSetting.setPipelineName(PIPELINE_NAME); + dlqPushHandler.perform(pluginSetting, failedDlqData); + Assertions.assertNotNull(pluginFactory); + verify(dlqWriter).write(anyList(), anyString(), anyString()); + } + + @Test + void perform_for_dlq_s3_success_forcepathstyle_false() throws IOException { + Map props = new HashMap<>(); + props.put(BUCKET,BUCKET_VALUE); + props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE); + props.put(FORCE_PATH_STYLE,FORCE_PATH_STYLE_FALSE); + + when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider); + + when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter)); + doNothing().when(dlqWriter).write(anyList(), anyString(), anyString()); + SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0); + dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,FORCE_PATH_STYLE_FALSE); + + PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + pluginSetting.setPipelineName(PIPELINE_NAME); + dlqPushHandler.perform(pluginSetting, failedDlqData); + Assertions.assertNotNull(pluginFactory); + verify(dlqWriter).write(anyList(), anyString(), anyString()); + } + @Test void perform_for_dlq_local_file_success(){