From 303b79a8e130bcb88b00bb0f7c6c0eed9ef2eddd Mon Sep 17 00:00:00 2001 From: Simon ELBAZ Date: Wed, 10 Dec 2025 09:51:46 +0000 Subject: [PATCH 1/3] S3 DLQ: Add path style (deprecated) access Signed-off-by: Simon ELBAZ --- .../plugins/dlq/s3/S3DlqWriterConfig.java | 9 ++++ .../plugins/dlq/s3/S3DlqWriterConfigTest.java | 15 ++++++ .../plugins/sink/sns/dlq/DlqPushHandler.java | 7 ++- .../sink/sns/dlq/DlqPushHandlerTest.java | 47 +++++++++++++++++++ 4 files changed, 77 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java b/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java index 8a7e2a97c9..b3845c3b01 100644 --- a/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java +++ b/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java @@ -36,6 +36,7 @@ public class S3DlqWriterConfig { private static final String AWS_IAM_ROLE = "role"; private static final String AWS_IAM = "iam"; private static final String S3_PREFIX = "s3://"; + private static final boolean FORCE_PATH_STYLE = false; @JsonProperty("bucket") @NotEmpty @@ -50,6 +51,9 @@ public class S3DlqWriterConfig { @Size(min = 1, message = "region cannot be empty string") private String region = DEFAULT_AWS_REGION; + @JsonProperty("force_path_style") + private boolean forcePathStyle = FORCE_PATH_STYLE; + @JsonProperty("sts_role_arn") @Size(min = 20, max = 2048, message = "sts_role_arn length should be between 1 and 2048 characters") private String stsRoleArn; @@ -76,6 +80,10 @@ public String getKeyPathPrefix() { return keyPathPrefix; } + public boolean getForcePathStyle() { + return forcePathStyle; + } + public Region getRegion() { return Region.of(region); } @@ -135,6 +143,7 @@ private Arn getArn() { public S3Client getS3Client() { return S3Client.builder() .region(this.getRegion()) + .forcePathStyle(this.getForcePathStyle()) .credentialsProvider(this.getAwsCredentialsProvider()) .overrideConfiguration(ClientOverrideConfiguration.builder() .retryPolicy(RetryPolicy.builder().numRetries(MAX_NUMBER_OF_RETRIES).build()) diff --git a/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java b/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java index c57b6e3d47..9bda5d47ea 100644 --- a/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java +++ b/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java @@ -35,6 +35,11 @@ public void testDefaultKeyPathPrefix() { assertThat(new S3DlqWriterConfig().getKeyPathPrefix(), is(equalTo(null))); } + @Test + public void testDefaultForcePathStyle() { + assertThat(new S3DlqWriterConfig().getForcePathStyle(), is(equalTo(false))); + } + @ParameterizedTest @ValueSource(strings = {"foobar", "arn:aws:es:us-west-2:123456789012:domain/bogus-domain", "arn:aws:iam::123456789012:group/bogus-group"}) @@ -62,6 +67,16 @@ public void getS3ClientWithValidStsRoleArn(final String stsRoleArn) throws NoSuc assertThat(s3Client, is(notNullValue())); } + @ParameterizedTest + @NullSource + @ValueSource(booleans = {false, true}) + public void getS3ClientWithValidAccessStyle(final boolean forcePathStyle) throws NoSuchFieldException, IllegalAccessException { + final S3DlqWriterConfig config = new S3DlqWriterConfig(); + reflectivelySetField(config, "force_path_style", forcePathStyle); + final S3Client s3Client = config.getS3Client(); + assertThat(s3Client, is(notNullValue())); + } + @ParameterizedTest @NullSource @ValueSource(strings = {"", "arn:aws:iam::123456789012:role/some-role"}) diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java index a3b6dd4995..82321464ef 100644 --- a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java @@ -49,6 +49,8 @@ public class DlqPushHandler { private static final String KEY_PATH_PREFIX = "key_path_prefix"; + private static final String FORCE_PATH_STYLE = "force_path_style"; + private String dlqFile; private String keyPathPrefix; @@ -62,12 +64,13 @@ public DlqPushHandler(final String dlqFile, final String bucket, final String stsRoleArn, final String awsRegion, + final Boolean forcePathStyle, final String dlqPathPrefix) { if(dlqFile != null) { this.dlqFile = dlqFile; this.objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter(); }else{ - this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,dlqPathPrefix); + this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,forcePathStyle,dlqPathPrefix); } } @@ -117,11 +120,13 @@ private DlqProvider getDlqProvider(final PluginFactory pluginFactory, final String bucket, final String stsRoleArn, final String awsRegion, + final Boolean forcePathStyle, final String dlqPathPrefix) { final Map props = new HashMap<>(); props.put(BUCKET, bucket); props.put(ROLE_ARN, stsRoleArn); props.put(REGION, awsRegion); + props.put(FORCE_PATH_STYLE, forcePathStyle); this.keyPathPrefix = StringUtils.isEmpty(dlqPathPrefix) ? dlqPathPrefix : enforceDefaultDelimiterOnKeyPathPrefix(dlqPathPrefix); props.put(KEY_PATH_PREFIX, dlqPathPrefix); final PluginSetting dlqPluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); diff --git a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java index 5af66430d1..cbf58c80d9 100644 --- a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java +++ b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java @@ -39,6 +39,11 @@ class DlqPushHandlerTest { private static final String KEY_PATH_PREFIX_VALUE = "dlq/"; + private static final String FORCE_PATH_STYLE = "force_path_style"; + + private static final boolean FORCE_PATH_STYLE_TRUE = true; + private static final boolean FORCE_PATH_STYLE_FALSE = false; + private static final String PIPELINE_NAME = "log-pipeline"; private static final String DLQ_FILE = "local_dlq_file"; @@ -84,6 +89,48 @@ void perform_for_dlq_s3_success() throws IOException { verify(dlqWriter).write(anyList(), anyString(), anyString()); } + @Test + void perform_for_dlq_s3_success_forcepathstyle_true() throws IOException { + Map props = new HashMap<>(); + props.put(BUCKET,BUCKET_VALUE); + props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE); + props.put(FORCE_PATH_STYLE,FORCE_PATH_STYLE_TRUE); + + when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider); + + when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter)); + doNothing().when(dlqWriter).write(anyList(), anyString(), anyString()); + SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0); + dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,FORCE_PATH_STYLE_TRUE); + + PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + pluginSetting.setPipelineName(PIPELINE_NAME); + dlqPushHandler.perform(pluginSetting, failedDlqData); + Assertions.assertNotNull(pluginFactory); + verify(dlqWriter).write(anyList(), anyString(), anyString()); + } + + @Test + void perform_for_dlq_s3_success_forcepathstyle_false() throws IOException { + Map props = new HashMap<>(); + props.put(BUCKET,BUCKET_VALUE); + props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE); + props.put(FORCE_PATH_STYLE,FORCE_PATH_STYLE_FALSE); + + when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider); + + when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter)); + doNothing().when(dlqWriter).write(anyList(), anyString(), anyString()); + SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0); + dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,FORCE_PATH_STYLE_FALSE); + + PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + pluginSetting.setPipelineName(PIPELINE_NAME); + dlqPushHandler.perform(pluginSetting, failedDlqData); + Assertions.assertNotNull(pluginFactory); + verify(dlqWriter).write(anyList(), anyString(), anyString()); + } + @Test void perform_for_dlq_local_file_success(){ From f5a58d0af0eb165c62d40f2839284d39ecd5e34d Mon Sep 17 00:00:00 2001 From: Simon ELBAZ Date: Sat, 31 Jan 2026 22:09:49 +0000 Subject: [PATCH 2/3] Fix flaky tests Signed-off-by: Simon ELBAZ --- .../dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java b/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java index 9bda5d47ea..4bffdbd307 100644 --- a/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java +++ b/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java @@ -68,11 +68,10 @@ public void getS3ClientWithValidStsRoleArn(final String stsRoleArn) throws NoSuc } @ParameterizedTest - @NullSource @ValueSource(booleans = {false, true}) public void getS3ClientWithValidAccessStyle(final boolean forcePathStyle) throws NoSuchFieldException, IllegalAccessException { final S3DlqWriterConfig config = new S3DlqWriterConfig(); - reflectivelySetField(config, "force_path_style", forcePathStyle); + reflectivelySetField(config, "forcePathStyle", forcePathStyle); final S3Client s3Client = config.getS3Client(); assertThat(s3Client, is(notNullValue())); } From 2b3dd6487a6e25fa2a826c12cca6f471d6d095d1 Mon Sep 17 00:00:00 2001 From: Simon ELBAZ Date: Tue, 28 Apr 2026 19:48:51 +0000 Subject: [PATCH 3/3] S3 DLQ: Add legacy MD5 checksum validation Signed-off-by: Simon ELBAZ --- .../plugins/dlq/s3/S3DlqWriterConfig.java | 25 ++++++++-- .../plugins/dlq/s3/S3DlqWriterConfigTest.java | 14 ++++++ .../plugins/sink/sns/dlq/DlqPushHandler.java | 7 ++- .../sink/sns/dlq/DlqPushHandlerTest.java | 47 +++++++++++++++++++ 4 files changed, 89 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java b/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java index b3845c3b01..2c747b91be 100644 --- a/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java +++ b/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java @@ -12,10 +12,14 @@ import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; +import software.amazon.awssdk.core.checksums.ResponseChecksumValidation; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.LegacyMd5Plugin; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; @@ -37,6 +41,7 @@ public class S3DlqWriterConfig { private static final String AWS_IAM = "iam"; private static final String S3_PREFIX = "s3://"; private static final boolean FORCE_PATH_STYLE = false; + private static final boolean LEGACY_MD5_CHECKSUM = false; @JsonProperty("bucket") @NotEmpty @@ -54,6 +59,9 @@ public class S3DlqWriterConfig { @JsonProperty("force_path_style") private boolean forcePathStyle = FORCE_PATH_STYLE; + @JsonProperty("legacy_md5_checksum") + private boolean legacyMd5Checksum = LEGACY_MD5_CHECKSUM; + @JsonProperty("sts_role_arn") @Size(min = 20, max = 2048, message = "sts_role_arn length should be between 1 and 2048 characters") private String stsRoleArn; @@ -84,6 +92,10 @@ public boolean getForcePathStyle() { return forcePathStyle; } + public boolean getLegacyMd5Checksum() { + return legacyMd5Checksum; + } + public Region getRegion() { return Region.of(region); } @@ -141,13 +153,20 @@ private Arn getArn() { } public S3Client getS3Client() { - return S3Client.builder() + S3ClientBuilder s3ClientBuilder = S3Client.builder() .region(this.getRegion()) .forcePathStyle(this.getForcePathStyle()) .credentialsProvider(this.getAwsCredentialsProvider()) .overrideConfiguration(ClientOverrideConfiguration.builder() .retryPolicy(RetryPolicy.builder().numRetries(MAX_NUMBER_OF_RETRIES).build()) - .build()) - .build(); + .build()); + + if (legacyMd5Checksum == true) { + s3ClientBuilder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED) + .responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED) + .addPlugin(LegacyMd5Plugin.create()); + } + + return s3ClientBuilder.build(); } } diff --git a/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java b/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java index 4bffdbd307..dcb4021f61 100644 --- a/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java +++ b/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java @@ -40,6 +40,11 @@ public void testDefaultForcePathStyle() { assertThat(new S3DlqWriterConfig().getForcePathStyle(), is(equalTo(false))); } + @Test + public void testDefaultLegacyMd5Checksum() { + assertThat(new S3DlqWriterConfig().getLegacyMd5Checksum(), is(equalTo(false))); + } + @ParameterizedTest @ValueSource(strings = {"foobar", "arn:aws:es:us-west-2:123456789012:domain/bogus-domain", "arn:aws:iam::123456789012:group/bogus-group"}) @@ -76,6 +81,15 @@ public void getS3ClientWithValidAccessStyle(final boolean forcePathStyle) throws assertThat(s3Client, is(notNullValue())); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void getS3ClientWithLegacyMd5Checksum(final boolean legacyMd5Checksum) throws NoSuchFieldException, IllegalAccessException { + final S3DlqWriterConfig config = new S3DlqWriterConfig(); + reflectivelySetField(config, "legacyMd5Checksum", legacyMd5Checksum); + final S3Client s3Client = config.getS3Client(); + assertThat(s3Client, is(notNullValue())); + } + @ParameterizedTest @NullSource @ValueSource(strings = {"", "arn:aws:iam::123456789012:role/some-role"}) diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java index 82321464ef..c3101e34f6 100644 --- a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java @@ -51,6 +51,8 @@ public class DlqPushHandler { private static final String FORCE_PATH_STYLE = "force_path_style"; + private static final String LEGACY_MD5_CHECKSUM = "legacy_md5_checksum"; + private String dlqFile; private String keyPathPrefix; @@ -65,12 +67,13 @@ public DlqPushHandler(final String dlqFile, final String stsRoleArn, final String awsRegion, final Boolean forcePathStyle, + final Boolean legacyMd5Checksum, final String dlqPathPrefix) { if(dlqFile != null) { this.dlqFile = dlqFile; this.objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter(); }else{ - this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,forcePathStyle,dlqPathPrefix); + this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,forcePathStyle,legacyMd5Checksum,dlqPathPrefix); } } @@ -121,12 +124,14 @@ private DlqProvider getDlqProvider(final PluginFactory pluginFactory, final String stsRoleArn, final String awsRegion, final Boolean forcePathStyle, + final Boolean legacyMd5Checksum, final String dlqPathPrefix) { final Map props = new HashMap<>(); props.put(BUCKET, bucket); props.put(ROLE_ARN, stsRoleArn); props.put(REGION, awsRegion); props.put(FORCE_PATH_STYLE, forcePathStyle); + props.put(LEGACY_MD5_CHECKSUM, legacyMd5Checksum); this.keyPathPrefix = StringUtils.isEmpty(dlqPathPrefix) ? dlqPathPrefix : enforceDefaultDelimiterOnKeyPathPrefix(dlqPathPrefix); props.put(KEY_PATH_PREFIX, dlqPathPrefix); final PluginSetting dlqPluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); diff --git a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java index cbf58c80d9..2667cb836e 100644 --- a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java +++ b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java @@ -39,6 +39,11 @@ class DlqPushHandlerTest { private static final String KEY_PATH_PREFIX_VALUE = "dlq/"; + private static final String LEGACY_MD5_CHECKSUM = "legacy_md5_checksum"; + + private static final boolean LEGACY_MD5_CHECKSUM_TRUE = true; + private static final boolean LEGACY_MD5_CHECKSUM_FALSE = false; + private static final String FORCE_PATH_STYLE = "force_path_style"; private static final boolean FORCE_PATH_STYLE_TRUE = true; @@ -89,6 +94,48 @@ void perform_for_dlq_s3_success() throws IOException { verify(dlqWriter).write(anyList(), anyString(), anyString()); } + @Test + void perform_for_dlq_s3_success_legacymd5checksum_true() throws IOException { + Map props = new HashMap<>(); + props.put(BUCKET,BUCKET_VALUE); + props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE); + props.put(LEGACY_MD5_CHECKSUM,LEGACY_MD5_CHECKSUM_TRUE); + + when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider); + + when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter)); + doNothing().when(dlqWriter).write(anyList(), anyString(), anyString()); + SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0); + dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,LEGACY_MD5_CHECKSUM_TRUE); + + PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + pluginSetting.setPipelineName(PIPELINE_NAME); + dlqPushHandler.perform(pluginSetting, failedDlqData); + Assertions.assertNotNull(pluginFactory); + verify(dlqWriter).write(anyList(), anyString(), anyString()); + } + + @Test + void perform_for_dlq_s3_success_legacymd5checksum_false() throws IOException { + Map props = new HashMap<>(); + props.put(BUCKET,BUCKET_VALUE); + props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE); + props.put(LEGACY_MD5_CHECKSUM,LEGACY_MD5_CHECKSUM_FALSE); + + when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider); + + when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter)); + doNothing().when(dlqWriter).write(anyList(), anyString(), anyString()); + SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0); + dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,LEGACY_MD5_CHECKSUM_FALSE); + + PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + pluginSetting.setPipelineName(PIPELINE_NAME); + dlqPushHandler.perform(pluginSetting, failedDlqData); + Assertions.assertNotNull(pluginFactory); + verify(dlqWriter).write(anyList(), anyString(), anyString()); + } + @Test void perform_for_dlq_s3_success_forcepathstyle_true() throws IOException { Map props = new HashMap<>();