Skip to content

Commit 2b3dd64

Browse files
committed
S3 DLQ: Add legacy MD5 checksum validation
Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
1 parent f5a58d0 commit 2b3dd64

4 files changed

Lines changed: 89 additions & 4 deletions

File tree

data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@
1212
import software.amazon.awssdk.arns.Arn;
1313
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
1414
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
15+
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
16+
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
1517
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
1618
import software.amazon.awssdk.core.retry.RetryPolicy;
1719
import software.amazon.awssdk.regions.Region;
20+
import software.amazon.awssdk.services.s3.LegacyMd5Plugin;
1821
import software.amazon.awssdk.services.s3.S3Client;
22+
import software.amazon.awssdk.services.s3.S3ClientBuilder;
1923
import software.amazon.awssdk.services.sts.StsClient;
2024
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
2125
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
@@ -37,6 +41,7 @@ public class S3DlqWriterConfig {
3741
private static final String AWS_IAM = "iam";
3842
private static final String S3_PREFIX = "s3://";
3943
private static final boolean FORCE_PATH_STYLE = false;
44+
private static final boolean LEGACY_MD5_CHECKSUM = false;
4045

4146
@JsonProperty("bucket")
4247
@NotEmpty
@@ -54,6 +59,9 @@ public class S3DlqWriterConfig {
5459
@JsonProperty("force_path_style")
5560
private boolean forcePathStyle = FORCE_PATH_STYLE;
5661

62+
@JsonProperty("legacy_md5_checksum")
63+
private boolean legacyMd5Checksum = LEGACY_MD5_CHECKSUM;
64+
5765
@JsonProperty("sts_role_arn")
5866
@Size(min = 20, max = 2048, message = "sts_role_arn length should be between 1 and 2048 characters")
5967
private String stsRoleArn;
@@ -84,6 +92,10 @@ public boolean getForcePathStyle() {
8492
return forcePathStyle;
8593
}
8694

95+
public boolean getLegacyMd5Checksum() {
96+
return legacyMd5Checksum;
97+
}
98+
8799
public Region getRegion() {
88100
return Region.of(region);
89101
}
@@ -141,13 +153,20 @@ private Arn getArn() {
141153
}
142154

143155
public S3Client getS3Client() {
144-
return S3Client.builder()
156+
S3ClientBuilder s3ClientBuilder = S3Client.builder()
145157
.region(this.getRegion())
146158
.forcePathStyle(this.getForcePathStyle())
147159
.credentialsProvider(this.getAwsCredentialsProvider())
148160
.overrideConfiguration(ClientOverrideConfiguration.builder()
149161
.retryPolicy(RetryPolicy.builder().numRetries(MAX_NUMBER_OF_RETRIES).build())
150-
.build())
151-
.build();
162+
.build());
163+
164+
if (legacyMd5Checksum == true) {
165+
s3ClientBuilder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED)
166+
.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED)
167+
.addPlugin(LegacyMd5Plugin.create());
168+
}
169+
170+
return s3ClientBuilder.build();
152171
}
153172
}

data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public void testDefaultForcePathStyle() {
4040
assertThat(new S3DlqWriterConfig().getForcePathStyle(), is(equalTo(false)));
4141
}
4242

43+
@Test
44+
public void testDefaultLegacyMd5Checksum() {
45+
assertThat(new S3DlqWriterConfig().getLegacyMd5Checksum(), is(equalTo(false)));
46+
}
47+
4348
@ParameterizedTest
4449
@ValueSource(strings = {"foobar", "arn:aws:es:us-west-2:123456789012:domain/bogus-domain",
4550
"arn:aws:iam::123456789012:group/bogus-group"})
@@ -76,6 +81,15 @@ public void getS3ClientWithValidAccessStyle(final boolean forcePathStyle) throws
7681
assertThat(s3Client, is(notNullValue()));
7782
}
7883

84+
@ParameterizedTest
85+
@ValueSource(booleans = {false, true})
86+
public void getS3ClientWithLegacyMd5Checksum(final boolean legacyMd5Checksum) throws NoSuchFieldException, IllegalAccessException {
87+
final S3DlqWriterConfig config = new S3DlqWriterConfig();
88+
reflectivelySetField(config, "legacyMd5Checksum", legacyMd5Checksum);
89+
final S3Client s3Client = config.getS3Client();
90+
assertThat(s3Client, is(notNullValue()));
91+
}
92+
7993
@ParameterizedTest
8094
@NullSource
8195
@ValueSource(strings = {"", "arn:aws:iam::123456789012:role/some-role"})

data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public class DlqPushHandler {
5151

5252
private static final String FORCE_PATH_STYLE = "force_path_style";
5353

54+
private static final String LEGACY_MD5_CHECKSUM = "legacy_md5_checksum";
55+
5456
private String dlqFile;
5557

5658
private String keyPathPrefix;
@@ -65,12 +67,13 @@ public DlqPushHandler(final String dlqFile,
6567
final String stsRoleArn,
6668
final String awsRegion,
6769
final Boolean forcePathStyle,
70+
final Boolean legacyMd5Checksum,
6871
final String dlqPathPrefix) {
6972
if(dlqFile != null) {
7073
this.dlqFile = dlqFile;
7174
this.objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter();
7275
}else{
73-
this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,forcePathStyle,dlqPathPrefix);
76+
this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,forcePathStyle,legacyMd5Checksum,dlqPathPrefix);
7477
}
7578
}
7679

@@ -121,12 +124,14 @@ private DlqProvider getDlqProvider(final PluginFactory pluginFactory,
121124
final String stsRoleArn,
122125
final String awsRegion,
123126
final Boolean forcePathStyle,
127+
final Boolean legacyMd5Checksum,
124128
final String dlqPathPrefix) {
125129
final Map<String, Object> props = new HashMap<>();
126130
props.put(BUCKET, bucket);
127131
props.put(ROLE_ARN, stsRoleArn);
128132
props.put(REGION, awsRegion);
129133
props.put(FORCE_PATH_STYLE, forcePathStyle);
134+
props.put(LEGACY_MD5_CHECKSUM, legacyMd5Checksum);
130135
this.keyPathPrefix = StringUtils.isEmpty(dlqPathPrefix) ? dlqPathPrefix : enforceDefaultDelimiterOnKeyPathPrefix(dlqPathPrefix);
131136
props.put(KEY_PATH_PREFIX, dlqPathPrefix);
132137
final PluginSetting dlqPluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);

data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ class DlqPushHandlerTest {
3939

4040
private static final String KEY_PATH_PREFIX_VALUE = "dlq/";
4141

42+
private static final String LEGACY_MD5_CHECKSUM = "legacy_md5_checksum";
43+
44+
private static final boolean LEGACY_MD5_CHECKSUM_TRUE = true;
45+
private static final boolean LEGACY_MD5_CHECKSUM_FALSE = false;
46+
4247
private static final String FORCE_PATH_STYLE = "force_path_style";
4348

4449
private static final boolean FORCE_PATH_STYLE_TRUE = true;
@@ -89,6 +94,48 @@ void perform_for_dlq_s3_success() throws IOException {
8994
verify(dlqWriter).write(anyList(), anyString(), anyString());
9095
}
9196

97+
@Test
98+
void perform_for_dlq_s3_success_legacymd5checksum_true() throws IOException {
99+
Map<String, Object> props = new HashMap<>();
100+
props.put(BUCKET,BUCKET_VALUE);
101+
props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE);
102+
props.put(LEGACY_MD5_CHECKSUM,LEGACY_MD5_CHECKSUM_TRUE);
103+
104+
when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider);
105+
106+
when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter));
107+
doNothing().when(dlqWriter).write(anyList(), anyString(), anyString());
108+
SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0);
109+
dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,LEGACY_MD5_CHECKSUM_TRUE);
110+
111+
PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
112+
pluginSetting.setPipelineName(PIPELINE_NAME);
113+
dlqPushHandler.perform(pluginSetting, failedDlqData);
114+
Assertions.assertNotNull(pluginFactory);
115+
verify(dlqWriter).write(anyList(), anyString(), anyString());
116+
}
117+
118+
@Test
119+
void perform_for_dlq_s3_success_legacymd5checksum_false() throws IOException {
120+
Map<String, Object> props = new HashMap<>();
121+
props.put(BUCKET,BUCKET_VALUE);
122+
props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE);
123+
props.put(LEGACY_MD5_CHECKSUM,LEGACY_MD5_CHECKSUM_FALSE);
124+
125+
when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider);
126+
127+
when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter));
128+
doNothing().when(dlqWriter).write(anyList(), anyString(), anyString());
129+
SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0);
130+
dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,LEGACY_MD5_CHECKSUM_FALSE);
131+
132+
PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
133+
pluginSetting.setPipelineName(PIPELINE_NAME);
134+
dlqPushHandler.perform(pluginSetting, failedDlqData);
135+
Assertions.assertNotNull(pluginFactory);
136+
verify(dlqWriter).write(anyList(), anyString(), anyString());
137+
}
138+
92139
@Test
93140
void perform_for_dlq_s3_success_forcepathstyle_true() throws IOException {
94141
Map<String, Object> props = new HashMap<>();

0 commit comments

Comments
 (0)