Skip to content

Commit 6800e61

Browse files
sb2k16sbose2k21
authored andcommitted
Add support for passing sts headers in kafka source (opensearch-project#6375)
* Add support for passing sts headers in kafka source Signed-off-by: Souvik Bose <souvbose@amazon.com> * Add test to cover valid header use case Signed-off-by: Souvik Bose <souvbose@amazon.com> * Minor code change for passing override config Signed-off-by: Souvik Bose <souvbose@amazon.com> * Add validation for tests Signed-off-by: Souvik Bose <souvbose@amazon.com> * Increase the test coverage. Signed-off-by: Souvik Bose <souvbose@amazon.com> --------- Signed-off-by: Souvik Bose <souvbose@amazon.com> Co-authored-by: Souvik Bose <souvbose@amazon.com> Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
1 parent b6ec88c commit 6800e61

5 files changed

Lines changed: 152 additions & 7 deletions

File tree

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import jakarta.validation.Valid;
1111
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
1212

13+
import java.util.Map;
14+
1315
public class AwsConfig implements AwsCredentialsConfig {
1416

1517
public static class AwsMskConfig {
@@ -28,6 +30,7 @@ public String getArn() {
2830
public MskBrokerConnectionType getBrokerConnectionType() {
2931
return brokerConnectionType;
3032
}
33+
3134
}
3235

3336
@JsonProperty("msk")
@@ -43,6 +46,10 @@ public MskBrokerConnectionType getBrokerConnectionType() {
4346
@JsonProperty("sts_role_arn")
4447
private String stsRoleArn;
4548

49+
@JsonProperty("sts_header_overrides")
50+
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
51+
private Map<String, String> awsStsHeaderOverrides;
52+
4653
@JsonProperty("role_session_name")
4754
private String stsRoleSessionName;
4855

@@ -64,11 +71,16 @@ public String getStsRoleSessionName() {
6471
return stsRoleSessionName;
6572
}
6673

74+
public Map<String, String> getAwsStsHeaderOverrides() {
75+
return awsStsHeaderOverrides;
76+
}
77+
6778
@Override
6879
public AwsCredentialsOptions toCredentialsOptions() {
6980
return AwsCredentialsOptions.builder()
7081
.withRegion(region)
7182
.withStsRoleArn(stsRoleArn)
83+
.withStsHeaderOverrides(awsStsHeaderOverrides)
7284
.build();
7385
}
7486
}

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -255,16 +255,21 @@ private static void configureMSKCredentialsProvider(final AuthConfig authConfig,
255255
.region(Region.of(awsConfig.getRegion()))
256256
.credentialsProvider(mskCredentialsProvider)
257257
.build();
258+
AssumeRoleRequest.Builder assumeRequestBuilder = AssumeRoleRequest
259+
.builder()
260+
.roleArn(awsConfig.getStsRoleArn())
261+
.roleSessionName(sessionName);
262+
Map<String, String> headers = awsConfig.getAwsStsHeaderOverrides();
263+
if (Objects.nonNull(headers)) {
264+
assumeRequestBuilder.overrideConfiguration(configuration -> {
265+
headers.forEach(configuration::putHeader);
266+
});
267+
}
258268
mskCredentialsProvider = StsAssumeRoleCredentialsProvider
259269
.builder()
260270
.stsClient(stsClient)
261-
.refreshRequest(
262-
AssumeRoleRequest
263-
.builder()
264-
.roleArn(awsConfig.getStsRoleArn())
265-
.roleSessionName(sessionName)
266-
.build()
267-
).build();
271+
.refreshRequest(assumeRequestBuilder.build())
272+
.build();
268273
}
269274
}
270275

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfigTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.junit.jupiter.api.Test;
1010

1111
import java.lang.reflect.Field;
12+
import java.util.Map;
1213

1314
import static org.hamcrest.CoreMatchers.equalTo;
1415
import static org.hamcrest.MatcherAssert.assertThat;
@@ -36,6 +37,53 @@ void TestConfigOptions_notNull() throws NoSuchFieldException, IllegalAccessExcep
3637
final String testRegion = RandomStringUtils.randomAlphabetic(8);
3738
reflectivelySetField(awsConfig, "region", testRegion);
3839
assertThat(awsConfig.getRegion(), equalTo(testRegion));
40+
41+
final Map<String, String> testStsHeaderOverrides = Map.of("header1", "value1", "header2", "value2");
42+
reflectivelySetField(awsConfig, "awsStsHeaderOverrides", testStsHeaderOverrides);
43+
assertThat(awsConfig.getAwsStsHeaderOverrides(), equalTo(testStsHeaderOverrides));
44+
}
45+
46+
@Test
47+
void testStsHeaderOverridesValidation_hasMaxSizeConstraint() throws NoSuchFieldException {
48+
// Verify that the sts_header_overrides field has the @Size(max = 5) validation annotation
49+
final Field field = AwsConfig.class.getDeclaredField("awsStsHeaderOverrides");
50+
final jakarta.validation.constraints.Size sizeAnnotation = field.getAnnotation(jakarta.validation.constraints.Size.class);
51+
52+
assertThat("sts_header_overrides field should have @Size annotation", sizeAnnotation != null, equalTo(true));
53+
assertThat("sts_header_overrides should have max size of 5", sizeAnnotation.max(), equalTo(5));
54+
assertThat("sts_header_overrides validation message should be correct",
55+
sizeAnnotation.message(), equalTo("sts_header_overrides supports a maximum of 5 headers to override"));
56+
}
57+
58+
@Test
59+
void testToCredentialsOptions_withoutStsHeaderOverrides() throws NoSuchFieldException, IllegalAccessException {
60+
final String testRegion = "us-east-1";
61+
final String testStsRoleArn = "arn:aws:iam::123456789012:role/test-role";
62+
63+
reflectivelySetField(awsConfig, "region", testRegion);
64+
reflectivelySetField(awsConfig, "stsRoleArn", testStsRoleArn);
65+
66+
final org.opensearch.dataprepper.aws.api.AwsCredentialsOptions result = awsConfig.toCredentialsOptions();
67+
68+
assertThat(result.getRegion().toString(), equalTo(testRegion));
69+
assertThat(result.getStsRoleArn(), equalTo(testStsRoleArn));
70+
}
71+
72+
@Test
73+
void testToCredentialsOptions_withStsHeaderOverrides() throws NoSuchFieldException, IllegalAccessException {
74+
final String testRegion = "us-east-1";
75+
final String testStsRoleArn = "arn:aws:iam::123456789012:role/test-role";
76+
77+
reflectivelySetField(awsConfig, "region", testRegion);
78+
reflectivelySetField(awsConfig, "stsRoleArn", testStsRoleArn);
79+
final Map<String, String> testStsHeaderOverrides = Map.of("header1", "value1", "header2", "value2");
80+
reflectivelySetField(awsConfig, "awsStsHeaderOverrides", testStsHeaderOverrides);
81+
82+
final org.opensearch.dataprepper.aws.api.AwsCredentialsOptions result = awsConfig.toCredentialsOptions();
83+
84+
assertThat(result.getRegion().toString(), equalTo(testRegion));
85+
assertThat(result.getStsRoleArn(), equalTo(testStsRoleArn));
86+
assertThat(result.getStsHeaderOverrides(), equalTo(testStsHeaderOverrides));
3987
}
4088

4189
private void reflectivelySetField(final AwsConfig awsConfig, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException {

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
import org.slf4j.LoggerFactory;
2626
import org.yaml.snakeyaml.Yaml;
2727
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
28+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
2829
import software.amazon.awssdk.regions.Region;
2930
import software.amazon.awssdk.services.kafka.KafkaClient;
3031
import software.amazon.awssdk.services.kafka.KafkaClientBuilder;
3132
import software.amazon.awssdk.services.kafka.model.GetBootstrapBrokersRequest;
3233
import software.amazon.awssdk.services.kafka.model.GetBootstrapBrokersResponse;
3334
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
35+
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
3436

3537
import java.io.FileReader;
3638
import java.io.IOException;
@@ -43,11 +45,13 @@
4345

4446
import static org.apache.kafka.common.config.SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS;
4547
import static org.hamcrest.CoreMatchers.equalTo;
48+
import static org.hamcrest.CoreMatchers.hasItem;
4649
import static org.hamcrest.CoreMatchers.instanceOf;
4750
import static org.hamcrest.CoreMatchers.notNullValue;
4851
import static org.hamcrest.CoreMatchers.nullValue;
4952
import static org.hamcrest.MatcherAssert.assertThat;
5053
import static org.hamcrest.CoreMatchers.is;
54+
import static org.hamcrest.Matchers.hasKey;
5155
import static org.mockito.ArgumentMatchers.any;
5256
import static org.mockito.Mockito.mock;
5357
import static org.mockito.Mockito.mockStatic;
@@ -347,6 +351,61 @@ void testSetDynamicSaslClientCallbackHandlerWithNullAuthConfig() {
347351
verifyNoInteractions(pluginConfigObservable);
348352
}
349353

354+
@Test
355+
void testSetAuthPropertiesWithStsHeaderOverrides() throws IOException {
356+
final Properties props = new Properties();
357+
final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-bootstrap-servers-sasl-iam-role.yaml");
358+
359+
try (MockedStatic<StsAssumeRoleCredentialsProvider> mockedProvider = mockStatic(StsAssumeRoleCredentialsProvider.class)) {
360+
final StsAssumeRoleCredentialsProvider.Builder mockBuilder = mock(StsAssumeRoleCredentialsProvider.Builder.class);
361+
when(mockBuilder.stsClient(any())).thenReturn(mockBuilder);
362+
when(mockBuilder.refreshRequest(any(AssumeRoleRequest.class))).thenReturn(mockBuilder);
363+
when(mockBuilder.build()).thenReturn(stsAssumeRoleCredentialsProvider);
364+
mockedProvider.when(StsAssumeRoleCredentialsProvider::builder).thenReturn(mockBuilder);
365+
366+
KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG);
367+
368+
verify(mockBuilder).refreshRequest(any(AssumeRoleRequest.class));
369+
}
370+
}
371+
372+
@Test
373+
void testSetAuthPropertiesWithStsHeaderOverridesConfigured() throws IOException {
374+
final Properties props = new Properties();
375+
final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-bootstrap-servers-sasl-iam-role-with-headers.yaml");
376+
377+
try (MockedStatic<StsAssumeRoleCredentialsProvider> mockedProvider = mockStatic(StsAssumeRoleCredentialsProvider.class)) {
378+
final StsAssumeRoleCredentialsProvider.Builder stsCredentialsProviderBuilder = mock(StsAssumeRoleCredentialsProvider.Builder.class);
379+
when(stsCredentialsProviderBuilder.stsClient(any())).thenReturn(stsCredentialsProviderBuilder);
380+
when(stsCredentialsProviderBuilder.refreshRequest(any(AssumeRoleRequest.class))).thenReturn(stsCredentialsProviderBuilder);
381+
when(stsCredentialsProviderBuilder.build()).thenReturn(stsAssumeRoleCredentialsProvider);
382+
mockedProvider.when(StsAssumeRoleCredentialsProvider::builder).thenReturn(stsCredentialsProviderBuilder);
383+
384+
KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG);
385+
386+
final ArgumentCaptor<AssumeRoleRequest> assumeRoleRequestArgumentCaptor = ArgumentCaptor.forClass(AssumeRoleRequest.class);
387+
verify(stsCredentialsProviderBuilder).refreshRequest(assumeRoleRequestArgumentCaptor.capture());
388+
final AssumeRoleRequest actualAssumeRoleRequest = assumeRoleRequestArgumentCaptor.getValue();
389+
assertThat(actualAssumeRoleRequest.overrideConfiguration(), notNullValue());
390+
assertThat(actualAssumeRoleRequest.overrideConfiguration().isPresent(), equalTo(true));
391+
final AwsRequestOverrideConfiguration overrideConfiguration = actualAssumeRoleRequest.overrideConfiguration().get();
392+
assertThat(overrideConfiguration.headers(), notNullValue());
393+
assertThat(overrideConfiguration.headers().size(), equalTo(2));
394+
final String headerName1 = "X-Custom-Header";
395+
final String headerValue1 = "custom-value";
396+
final String headerName2 = "X-Another-Header";
397+
final String headerValue2 = "another-value";
398+
assertThat(overrideConfiguration.headers(), hasKey(headerName1));
399+
assertThat(overrideConfiguration.headers(), hasKey(headerName2));
400+
assertThat(overrideConfiguration.headers().get(headerName1), notNullValue());
401+
assertThat(overrideConfiguration.headers().get(headerName1).size(), equalTo(1));
402+
assertThat(overrideConfiguration.headers().get(headerName1), hasItem(headerValue1));
403+
assertThat(overrideConfiguration.headers().get(headerName2), notNullValue());
404+
assertThat(overrideConfiguration.headers().get(headerName2).size(), equalTo(1));
405+
assertThat(overrideConfiguration.headers().get(headerName2), hasItem(headerValue2));
406+
}
407+
}
408+
350409
private KafkaSourceConfig createKafkaSinkConfig(final String fileName) throws IOException {
351410
final Yaml yaml = new Yaml();
352411
final FileReader fileReader = new FileReader(Objects.requireNonNull(getClass().getClassLoader()
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
log-pipeline :
2+
source:
3+
kafka:
4+
bootstrap_servers:
5+
- "localhost:9092"
6+
encryption:
7+
type: "SSL"
8+
authentication:
9+
sasl:
10+
aws_msk_iam: role
11+
aws:
12+
region: us-east-2
13+
sts_role_arn: test_sasl_iam_sts_role
14+
sts_header_overrides:
15+
X-Custom-Header: custom-value
16+
X-Another-Header: another-value
17+
topics:
18+
- name: "quickstart-events"
19+
group_id: "groupdID1"
20+
sink:
21+
stdout:

0 commit comments

Comments
 (0)