Skip to content

Commit 287321c

Browse files
authored
Add legacy MD5 checksum validation (#6790)
S3 DLQ: Add path style (deprecated) access and legacy MD5 checksum validation Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
1 parent 5fa5451 commit 287321c

4 files changed

Lines changed: 164 additions & 4 deletions

File tree

data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@
1212
import software.amazon.awssdk.arns.Arn;
1313
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
1414
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
15+
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
16+
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
1517
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
1618
import software.amazon.awssdk.core.retry.RetryPolicy;
1719
import software.amazon.awssdk.regions.Region;
20+
import software.amazon.awssdk.services.s3.LegacyMd5Plugin;
1821
import software.amazon.awssdk.services.s3.S3Client;
22+
import software.amazon.awssdk.services.s3.S3ClientBuilder;
1923
import software.amazon.awssdk.services.sts.StsClient;
2024
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
2125
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
@@ -36,6 +40,8 @@ public class S3DlqWriterConfig {
3640
private static final String AWS_IAM_ROLE = "role";
3741
private static final String AWS_IAM = "iam";
3842
private static final String S3_PREFIX = "s3://";
43+
private static final boolean FORCE_PATH_STYLE = false;
44+
private static final boolean LEGACY_MD5_CHECKSUM = false;
3945

4046
@JsonProperty("bucket")
4147
@NotEmpty
@@ -50,6 +56,12 @@ public class S3DlqWriterConfig {
5056
@Size(min = 1, message = "region cannot be empty string")
5157
private String region = DEFAULT_AWS_REGION;
5258

59+
@JsonProperty("force_path_style")
60+
private boolean forcePathStyle = FORCE_PATH_STYLE;
61+
62+
@JsonProperty("legacy_md5_checksum")
63+
private boolean legacyMd5Checksum = LEGACY_MD5_CHECKSUM;
64+
5365
@JsonProperty("sts_role_arn")
5466
@Size(min = 20, max = 2048, message = "sts_role_arn length should be between 1 and 2048 characters")
5567
private String stsRoleArn;
@@ -76,6 +88,14 @@ public String getKeyPathPrefix() {
7688
return keyPathPrefix;
7789
}
7890

91+
public boolean getForcePathStyle() {
92+
return forcePathStyle;
93+
}
94+
95+
public boolean getLegacyMd5Checksum() {
96+
return legacyMd5Checksum;
97+
}
98+
7999
public Region getRegion() {
80100
return Region.of(region);
81101
}
@@ -133,12 +153,20 @@ private Arn getArn() {
133153
}
134154

135155
public S3Client getS3Client() {
136-
return S3Client.builder()
156+
S3ClientBuilder s3ClientBuilder = S3Client.builder()
137157
.region(this.getRegion())
158+
.forcePathStyle(this.getForcePathStyle())
138159
.credentialsProvider(this.getAwsCredentialsProvider())
139160
.overrideConfiguration(ClientOverrideConfiguration.builder()
140161
.retryPolicy(RetryPolicy.builder().numRetries(MAX_NUMBER_OF_RETRIES).build())
141-
.build())
142-
.build();
162+
.build());
163+
164+
if (legacyMd5Checksum == true) {
165+
s3ClientBuilder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED)
166+
.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED)
167+
.addPlugin(LegacyMd5Plugin.create());
168+
}
169+
170+
return s3ClientBuilder.build();
143171
}
144172
}

data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,16 @@ public void testDefaultKeyPathPrefix() {
3535
assertThat(new S3DlqWriterConfig().getKeyPathPrefix(), is(equalTo(null)));
3636
}
3737

38+
@Test
39+
public void testDefaultForcePathStyle() {
40+
assertThat(new S3DlqWriterConfig().getForcePathStyle(), is(equalTo(false)));
41+
}
42+
43+
@Test
44+
public void testDefaultLegacyMd5Checksum() {
45+
assertThat(new S3DlqWriterConfig().getLegacyMd5Checksum(), is(equalTo(false)));
46+
}
47+
3848
@ParameterizedTest
3949
@ValueSource(strings = {"foobar", "arn:aws:es:us-west-2:123456789012:domain/bogus-domain",
4050
"arn:aws:iam::123456789012:group/bogus-group"})
@@ -62,6 +72,24 @@ public void getS3ClientWithValidStsRoleArn(final String stsRoleArn) throws NoSuc
6272
assertThat(s3Client, is(notNullValue()));
6373
}
6474

75+
@ParameterizedTest
76+
@ValueSource(booleans = {false, true})
77+
public void getS3ClientWithValidAccessStyle(final boolean forcePathStyle) throws NoSuchFieldException, IllegalAccessException {
78+
final S3DlqWriterConfig config = new S3DlqWriterConfig();
79+
reflectivelySetField(config, "forcePathStyle", forcePathStyle);
80+
final S3Client s3Client = config.getS3Client();
81+
assertThat(s3Client, is(notNullValue()));
82+
}
83+
84+
@ParameterizedTest
85+
@ValueSource(booleans = {false, true})
86+
public void getS3ClientWithLegacyMd5Checksum(final boolean legacyMd5Checksum) throws NoSuchFieldException, IllegalAccessException {
87+
final S3DlqWriterConfig config = new S3DlqWriterConfig();
88+
reflectivelySetField(config, "legacyMd5Checksum", legacyMd5Checksum);
89+
final S3Client s3Client = config.getS3Client();
90+
assertThat(s3Client, is(notNullValue()));
91+
}
92+
6593
@ParameterizedTest
6694
@NullSource
6795
@ValueSource(strings = {"", "arn:aws:iam::123456789012:role/some-role"})

data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ public class DlqPushHandler {
4949

5050
private static final String KEY_PATH_PREFIX = "key_path_prefix";
5151

52+
private static final String FORCE_PATH_STYLE = "force_path_style";
53+
54+
private static final String LEGACY_MD5_CHECKSUM = "legacy_md5_checksum";
55+
5256
private String dlqFile;
5357

5458
private String keyPathPrefix;
@@ -62,12 +66,14 @@ public DlqPushHandler(final String dlqFile,
6266
final String bucket,
6367
final String stsRoleArn,
6468
final String awsRegion,
69+
final Boolean forcePathStyle,
70+
final Boolean legacyMd5Checksum,
6571
final String dlqPathPrefix) {
6672
if(dlqFile != null) {
6773
this.dlqFile = dlqFile;
6874
this.objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter();
6975
}else{
70-
this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,dlqPathPrefix);
76+
this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,forcePathStyle,legacyMd5Checksum,dlqPathPrefix);
7177
}
7278
}
7379

@@ -117,11 +123,15 @@ private DlqProvider getDlqProvider(final PluginFactory pluginFactory,
117123
final String bucket,
118124
final String stsRoleArn,
119125
final String awsRegion,
126+
final Boolean forcePathStyle,
127+
final Boolean legacyMd5Checksum,
120128
final String dlqPathPrefix) {
121129
final Map<String, Object> props = new HashMap<>();
122130
props.put(BUCKET, bucket);
123131
props.put(ROLE_ARN, stsRoleArn);
124132
props.put(REGION, awsRegion);
133+
props.put(FORCE_PATH_STYLE, forcePathStyle);
134+
props.put(LEGACY_MD5_CHECKSUM, legacyMd5Checksum);
125135
this.keyPathPrefix = StringUtils.isEmpty(dlqPathPrefix) ? dlqPathPrefix : enforceDefaultDelimiterOnKeyPathPrefix(dlqPathPrefix);
126136
props.put(KEY_PATH_PREFIX, dlqPathPrefix);
127137
final PluginSetting dlqPluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);

data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ class DlqPushHandlerTest {
3939

4040
private static final String KEY_PATH_PREFIX_VALUE = "dlq/";
4141

42+
private static final String LEGACY_MD5_CHECKSUM = "legacy_md5_checksum";
43+
44+
private static final boolean LEGACY_MD5_CHECKSUM_TRUE = true;
45+
private static final boolean LEGACY_MD5_CHECKSUM_FALSE = false;
46+
47+
private static final String FORCE_PATH_STYLE = "force_path_style";
48+
49+
private static final boolean FORCE_PATH_STYLE_TRUE = true;
50+
private static final boolean FORCE_PATH_STYLE_FALSE = false;
51+
4252
private static final String PIPELINE_NAME = "log-pipeline";
4353

4454
private static final String DLQ_FILE = "local_dlq_file";
@@ -84,6 +94,90 @@ void perform_for_dlq_s3_success() throws IOException {
8494
verify(dlqWriter).write(anyList(), anyString(), anyString());
8595
}
8696

97+
@Test
98+
void perform_for_dlq_s3_success_legacymd5checksum_true() throws IOException {
99+
Map<String, Object> props = new HashMap<>();
100+
props.put(BUCKET,BUCKET_VALUE);
101+
props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE);
102+
props.put(LEGACY_MD5_CHECKSUM,LEGACY_MD5_CHECKSUM_TRUE);
103+
104+
when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider);
105+
106+
when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter));
107+
doNothing().when(dlqWriter).write(anyList(), anyString(), anyString());
108+
SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0);
109+
dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,LEGACY_MD5_CHECKSUM_TRUE);
110+
111+
PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
112+
pluginSetting.setPipelineName(PIPELINE_NAME);
113+
dlqPushHandler.perform(pluginSetting, failedDlqData);
114+
Assertions.assertNotNull(pluginFactory);
115+
verify(dlqWriter).write(anyList(), anyString(), anyString());
116+
}
117+
118+
@Test
119+
void perform_for_dlq_s3_success_legacymd5checksum_false() throws IOException {
120+
Map<String, Object> props = new HashMap<>();
121+
props.put(BUCKET,BUCKET_VALUE);
122+
props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE);
123+
props.put(LEGACY_MD5_CHECKSUM,LEGACY_MD5_CHECKSUM_FALSE);
124+
125+
when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider);
126+
127+
when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter));
128+
doNothing().when(dlqWriter).write(anyList(), anyString(), anyString());
129+
SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0);
130+
dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,LEGACY_MD5_CHECKSUM_FALSE);
131+
132+
PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
133+
pluginSetting.setPipelineName(PIPELINE_NAME);
134+
dlqPushHandler.perform(pluginSetting, failedDlqData);
135+
Assertions.assertNotNull(pluginFactory);
136+
verify(dlqWriter).write(anyList(), anyString(), anyString());
137+
}
138+
139+
@Test
140+
void perform_for_dlq_s3_success_forcepathstyle_true() throws IOException {
141+
Map<String, Object> props = new HashMap<>();
142+
props.put(BUCKET,BUCKET_VALUE);
143+
props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE);
144+
props.put(FORCE_PATH_STYLE,FORCE_PATH_STYLE_TRUE);
145+
146+
when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider);
147+
148+
when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter));
149+
doNothing().when(dlqWriter).write(anyList(), anyString(), anyString());
150+
SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0);
151+
dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,FORCE_PATH_STYLE_TRUE);
152+
153+
PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
154+
pluginSetting.setPipelineName(PIPELINE_NAME);
155+
dlqPushHandler.perform(pluginSetting, failedDlqData);
156+
Assertions.assertNotNull(pluginFactory);
157+
verify(dlqWriter).write(anyList(), anyString(), anyString());
158+
}
159+
160+
@Test
161+
void perform_for_dlq_s3_success_forcepathstyle_false() throws IOException {
162+
Map<String, Object> props = new HashMap<>();
163+
props.put(BUCKET,BUCKET_VALUE);
164+
props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE);
165+
props.put(FORCE_PATH_STYLE,FORCE_PATH_STYLE_FALSE);
166+
167+
when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider);
168+
169+
when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter));
170+
doNothing().when(dlqWriter).write(anyList(), anyString(), anyString());
171+
SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0);
172+
dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,FORCE_PATH_STYLE_FALSE);
173+
174+
PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
175+
pluginSetting.setPipelineName(PIPELINE_NAME);
176+
dlqPushHandler.perform(pluginSetting, failedDlqData);
177+
Assertions.assertNotNull(pluginFactory);
178+
verify(dlqWriter).write(anyList(), anyString(), anyString());
179+
}
180+
87181

88182
@Test
89183
void perform_for_dlq_local_file_success(){

0 commit comments

Comments
 (0)