Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.LegacyMd5Plugin;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
Expand All @@ -36,6 +40,8 @@ public class S3DlqWriterConfig {
private static final String AWS_IAM_ROLE = "role";
private static final String AWS_IAM = "iam";
private static final String S3_PREFIX = "s3://";
private static final boolean FORCE_PATH_STYLE = false;
private static final boolean LEGACY_MD5_CHECKSUM = false;

@JsonProperty("bucket")
@NotEmpty
Expand All @@ -50,6 +56,12 @@ public class S3DlqWriterConfig {
@Size(min = 1, message = "region cannot be empty string")
private String region = DEFAULT_AWS_REGION;

@JsonProperty("force_path_style")
private boolean forcePathStyle = FORCE_PATH_STYLE;

@JsonProperty("legacy_md5_checksum")
private boolean legacyMd5Checksum = LEGACY_MD5_CHECKSUM;

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "sts_role_arn length should be between 1 and 2048 characters")
private String stsRoleArn;
Expand All @@ -76,6 +88,14 @@ public String getKeyPathPrefix() {
return keyPathPrefix;
}

public boolean getForcePathStyle() {
return forcePathStyle;
}

public boolean getLegacyMd5Checksum() {
return legacyMd5Checksum;
}

public Region getRegion() {
return Region.of(region);
}
Expand Down Expand Up @@ -133,12 +153,20 @@ private Arn getArn() {
}

public S3Client getS3Client() {
return S3Client.builder()
S3ClientBuilder s3ClientBuilder = S3Client.builder()
.region(this.getRegion())
.forcePathStyle(this.getForcePathStyle())
.credentialsProvider(this.getAwsCredentialsProvider())
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(MAX_NUMBER_OF_RETRIES).build())
.build())
.build();
.build());

if (legacyMd5Checksum == true) {
s3ClientBuilder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED)
.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED)
.addPlugin(LegacyMd5Plugin.create());
}

return s3ClientBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ public void testDefaultKeyPathPrefix() {
assertThat(new S3DlqWriterConfig().getKeyPathPrefix(), is(equalTo(null)));
}

@Test
public void testDefaultForcePathStyle() {
assertThat(new S3DlqWriterConfig().getForcePathStyle(), is(equalTo(false)));
}

@Test
public void testDefaultLegacyMd5Checksum() {
assertThat(new S3DlqWriterConfig().getLegacyMd5Checksum(), is(equalTo(false)));
}

@ParameterizedTest
@ValueSource(strings = {"foobar", "arn:aws:es:us-west-2:123456789012:domain/bogus-domain",
"arn:aws:iam::123456789012:group/bogus-group"})
Expand Down Expand Up @@ -62,6 +72,24 @@ public void getS3ClientWithValidStsRoleArn(final String stsRoleArn) throws NoSuc
assertThat(s3Client, is(notNullValue()));
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void getS3ClientWithValidAccessStyle(final boolean forcePathStyle) throws NoSuchFieldException, IllegalAccessException {
final S3DlqWriterConfig config = new S3DlqWriterConfig();
reflectivelySetField(config, "forcePathStyle", forcePathStyle);
final S3Client s3Client = config.getS3Client();
assertThat(s3Client, is(notNullValue()));
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void getS3ClientWithLegacyMd5Checksum(final boolean legacyMd5Checksum) throws NoSuchFieldException, IllegalAccessException {
final S3DlqWriterConfig config = new S3DlqWriterConfig();
reflectivelySetField(config, "legacyMd5Checksum", legacyMd5Checksum);
final S3Client s3Client = config.getS3Client();
assertThat(s3Client, is(notNullValue()));
}

@ParameterizedTest
@NullSource
@ValueSource(strings = {"", "arn:aws:iam::123456789012:role/some-role"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public class DlqPushHandler {

private static final String KEY_PATH_PREFIX = "key_path_prefix";

private static final String FORCE_PATH_STYLE = "force_path_style";

private static final String LEGACY_MD5_CHECKSUM = "legacy_md5_checksum";

private String dlqFile;

private String keyPathPrefix;
Expand All @@ -62,12 +66,14 @@ public DlqPushHandler(final String dlqFile,
final String bucket,
final String stsRoleArn,
final String awsRegion,
final Boolean forcePathStyle,
final Boolean legacyMd5Checksum,
final String dlqPathPrefix) {
if(dlqFile != null) {
this.dlqFile = dlqFile;
this.objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter();
}else{
this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,dlqPathPrefix);
this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,forcePathStyle,legacyMd5Checksum,dlqPathPrefix);
}
}

Expand Down Expand Up @@ -117,11 +123,15 @@ private DlqProvider getDlqProvider(final PluginFactory pluginFactory,
final String bucket,
final String stsRoleArn,
final String awsRegion,
final Boolean forcePathStyle,
final Boolean legacyMd5Checksum,
final String dlqPathPrefix) {
final Map<String, Object> props = new HashMap<>();
props.put(BUCKET, bucket);
props.put(ROLE_ARN, stsRoleArn);
props.put(REGION, awsRegion);
props.put(FORCE_PATH_STYLE, forcePathStyle);
props.put(LEGACY_MD5_CHECKSUM, legacyMd5Checksum);
this.keyPathPrefix = StringUtils.isEmpty(dlqPathPrefix) ? dlqPathPrefix : enforceDefaultDelimiterOnKeyPathPrefix(dlqPathPrefix);
props.put(KEY_PATH_PREFIX, dlqPathPrefix);
final PluginSetting dlqPluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ class DlqPushHandlerTest {

private static final String KEY_PATH_PREFIX_VALUE = "dlq/";

private static final String LEGACY_MD5_CHECKSUM = "legacy_md5_checksum";

private static final boolean LEGACY_MD5_CHECKSUM_TRUE = true;
private static final boolean LEGACY_MD5_CHECKSUM_FALSE = false;

private static final String FORCE_PATH_STYLE = "force_path_style";

private static final boolean FORCE_PATH_STYLE_TRUE = true;
private static final boolean FORCE_PATH_STYLE_FALSE = false;

private static final String PIPELINE_NAME = "log-pipeline";

private static final String DLQ_FILE = "local_dlq_file";
Expand Down Expand Up @@ -84,6 +94,90 @@ void perform_for_dlq_s3_success() throws IOException {
verify(dlqWriter).write(anyList(), anyString(), anyString());
}

@Test
void perform_for_dlq_s3_success_legacymd5checksum_true() throws IOException {
Map<String, Object> props = new HashMap<>();
props.put(BUCKET,BUCKET_VALUE);
props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE);
props.put(LEGACY_MD5_CHECKSUM,LEGACY_MD5_CHECKSUM_TRUE);

when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider);

when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter));
doNothing().when(dlqWriter).write(anyList(), anyString(), anyString());
SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0);
dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,LEGACY_MD5_CHECKSUM_TRUE);

PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
pluginSetting.setPipelineName(PIPELINE_NAME);
dlqPushHandler.perform(pluginSetting, failedDlqData);
Assertions.assertNotNull(pluginFactory);
verify(dlqWriter).write(anyList(), anyString(), anyString());
}

@Test
void perform_for_dlq_s3_success_legacymd5checksum_false() throws IOException {
Map<String, Object> props = new HashMap<>();
props.put(BUCKET,BUCKET_VALUE);
props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE);
props.put(LEGACY_MD5_CHECKSUM,LEGACY_MD5_CHECKSUM_FALSE);

when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider);

when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter));
doNothing().when(dlqWriter).write(anyList(), anyString(), anyString());
SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0);
dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,LEGACY_MD5_CHECKSUM_FALSE);

PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
pluginSetting.setPipelineName(PIPELINE_NAME);
dlqPushHandler.perform(pluginSetting, failedDlqData);
Assertions.assertNotNull(pluginFactory);
verify(dlqWriter).write(anyList(), anyString(), anyString());
}

@Test
void perform_for_dlq_s3_success_forcepathstyle_true() throws IOException {
Map<String, Object> props = new HashMap<>();
props.put(BUCKET,BUCKET_VALUE);
props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE);
props.put(FORCE_PATH_STYLE,FORCE_PATH_STYLE_TRUE);

when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider);

when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter));
doNothing().when(dlqWriter).write(anyList(), anyString(), anyString());
SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0);
dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,FORCE_PATH_STYLE_TRUE);

PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
pluginSetting.setPipelineName(PIPELINE_NAME);
dlqPushHandler.perform(pluginSetting, failedDlqData);
Assertions.assertNotNull(pluginFactory);
verify(dlqWriter).write(anyList(), anyString(), anyString());
}

@Test
void perform_for_dlq_s3_success_forcepathstyle_false() throws IOException {
Map<String, Object> props = new HashMap<>();
props.put(BUCKET,BUCKET_VALUE);
props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE);
props.put(FORCE_PATH_STYLE,FORCE_PATH_STYLE_FALSE);

when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider);

when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter));
doNothing().when(dlqWriter).write(anyList(), anyString(), anyString());
SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0);
dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE,FORCE_PATH_STYLE_FALSE);

PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
pluginSetting.setPipelineName(PIPELINE_NAME);
dlqPushHandler.perform(pluginSetting, failedDlqData);
Assertions.assertNotNull(pluginFactory);
verify(dlqWriter).write(anyList(), anyString(), anyString());
}


@Test
void perform_for_dlq_local_file_success(){
Expand Down
Loading