Skip to content

Commit 9377b2f

Browse files
committed
Added AWS SDK metrics to s3 and sqs clients via micrometer
- Added common AWSSdkMetrics class so it can be reused in different modules - Added integration tests to ensure remote calls generate metrics prefixed with the service name Signed-off-by: Manuel Mangas Zurita <mzurita@amazon.com>
1 parent e53033d commit 9377b2f

6 files changed

Lines changed: 183 additions & 0 deletions

File tree

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.dataprepper.plugins.source.s3;
6+
7+
import io.micrometer.core.instrument.MeterRegistry;
8+
import io.micrometer.core.instrument.Metrics;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
11+
import org.opensearch.dataprepper.core.meter.EMFLoggingMeterRegistry;
12+
import org.opensearch.dataprepper.plugins.source.s3.configuration.AwsAuthenticationOptions;
13+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
14+
import software.amazon.awssdk.regions.Region;
15+
import software.amazon.awssdk.services.s3.S3Client;
16+
import software.amazon.awssdk.services.s3.model.ListBucketsRequest;
17+
18+
import static org.junit.jupiter.api.Assertions.assertTrue;
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.when;
21+
22+
class S3MetricsIT {
23+
24+
private MeterRegistry meterRegistry;
25+
26+
@BeforeEach
27+
void setUp() {
28+
meterRegistry = new EMFLoggingMeterRegistry();
29+
Metrics.globalRegistry.clear();
30+
Metrics.addRegistry(meterRegistry);
31+
}
32+
33+
@Test
34+
void testS3ClientBuilderFactoryGeneratesMetrics() throws InterruptedException {
35+
final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class);
36+
final AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class);
37+
when(s3SourceConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
38+
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1);
39+
40+
final S3ClientBuilderFactory factory = new S3ClientBuilderFactory(s3SourceConfig, DefaultCredentialsProvider.create());
41+
final S3Client s3Client = factory.getS3Client();
42+
43+
try {
44+
s3Client.listBuckets(ListBucketsRequest.builder().build());
45+
} catch (Exception ignored) {
46+
}
47+
48+
// Wait for EMF registry to publish metrics
49+
Thread.sleep(2000);
50+
51+
assertTrue(meterRegistry.getMeters().stream()
52+
.anyMatch(meter -> meter.getId().getName().startsWith("aws.s3.")),
53+
"Expected S3 metrics to be generated");
54+
55+
s3Client.close();
56+
}
57+
}

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ClientBuilderFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
*/
55
package org.opensearch.dataprepper.plugins.source.s3;
66

7+
import org.opensearch.dataprepper.metrics.PluginMetrics;
8+
import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher;
79
import org.slf4j.Logger;
810
import org.slf4j.LoggerFactory;
911
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -23,24 +25,28 @@ public class S3ClientBuilderFactory {
2325
private final AwsCredentialsProvider credentialsProvider;
2426
private final S3Client s3Client;
2527
private final S3AsyncClient s3AsyncClient;
28+
2629
public S3ClientBuilderFactory(final S3SourceConfig s3SourceConfig, AwsCredentialsProvider credentialsProvider){
2730
this.s3SourceConfig = s3SourceConfig;
2831
this.credentialsProvider = credentialsProvider;
2932
this.s3Client = createS3Client();
3033
this.s3AsyncClient = createS3AsyncClient();
3134
}
35+
3236
/**
3337
* Create a S3Client Object for download the s3 Objects
3438
* @return a S3Client Object
3539
*/
3640
public S3Client createS3Client() {
3741
LOG.info("Creating S3 client");
42+
final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws");
3843
return S3Client.builder()
3944
.region(s3SourceConfig.getAwsAuthenticationOptions().getAwsRegion())
4045
.crossRegionAccessEnabled(true)
4146
.credentialsProvider(credentialsProvider)
4247
.overrideConfiguration(ClientOverrideConfiguration.builder()
4348
.retryPolicy(retryPolicy -> retryPolicy.numRetries(5).build())
49+
.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics))
4450
.build())
4551
.build();
4652
}
@@ -51,6 +57,7 @@ public S3Client createS3Client() {
5157
*/
5258
public S3AsyncClient createS3AsyncClient() {
5359
LOG.info("Creating S3 Async client");
60+
final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws");
5461
return S3AsyncClient.builder()
5562
.region(s3SourceConfig.getAwsAuthenticationOptions().getAwsRegion())
5663
.crossRegionAccessEnabled(true)
@@ -60,6 +67,7 @@ public S3AsyncClient createS3AsyncClient() {
6067
.credentialsProvider(credentialsProvider)
6168
.overrideConfiguration(ClientOverrideConfiguration.builder()
6269
.retryPolicy(retryPolicy -> retryPolicy.numRetries(5).build())
70+
.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics))
6371
.build())
6472
.build();
6573
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.dataprepper.plugins.source.s3;
6+
7+
import org.junit.jupiter.api.Test;
8+
import org.opensearch.dataprepper.plugins.source.s3.configuration.AwsAuthenticationOptions;
9+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
10+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
11+
import software.amazon.awssdk.regions.Region;
12+
import software.amazon.awssdk.services.s3.S3Client;
13+
14+
import static org.junit.jupiter.api.Assertions.assertNotNull;
15+
import static org.mockito.Mockito.mock;
16+
import static org.mockito.Mockito.when;
17+
18+
class S3ClientBuilderFactoryTest {
19+
20+
@Test
21+
void testS3ClientBuilderFactoryCreatesClientsWithMicrometerIntegration() {
22+
final StaticCredentialsProvider credentialsProvider =
23+
StaticCredentialsProvider.create(AwsBasicCredentials.create("testKey", "testSecret"));
24+
25+
final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class);
26+
final AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class);
27+
when(s3SourceConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
28+
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1);
29+
30+
final S3ClientBuilderFactory factory = new S3ClientBuilderFactory(s3SourceConfig, credentialsProvider);
31+
32+
final S3Client s3Client = factory.getS3Client();
33+
assertNotNull(s3Client, "S3Client should not be null");
34+
35+
assertNotNull(factory.getS3AsyncClient(), "S3AsyncClient should not be null");
36+
}
37+
}

data-prepper-plugins/sqs-common/build.gradle

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,20 @@ plugins {
77
id 'java'
88
}
99

10+
sourceSets {
11+
integrationTest {
12+
java {
13+
compileClasspath += sourceSets.main.output
14+
runtimeClasspath += sourceSets.main.output
15+
}
16+
}
17+
}
18+
19+
configurations {
20+
integrationTestImplementation.extendsFrom implementation
21+
integrationTestRuntimeOnly.extendsFrom runtimeOnly
22+
}
23+
1024
dependencies {
1125
implementation project(':data-prepper-api')
1226
implementation project(':data-prepper-plugins:buffer-common')
@@ -22,7 +36,18 @@ dependencies {
2236
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'
2337
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
2438
testImplementation project(':data-prepper-plugins:blocking-buffer')
39+
integrationTestImplementation project(':data-prepper-core')
40+
integrationTestImplementation 'io.micrometer:micrometer-core'
41+
integrationTestImplementation 'org.junit.jupiter:junit-jupiter-api'
42+
integrationTestRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
2543
}
44+
2645
test {
2746
useJUnitPlatform()
2847
}
48+
49+
task integrationTest(type: Test) {
50+
testClassesDirs = sourceSets.integrationTest.output.classesDirs
51+
classpath = sourceSets.integrationTest.runtimeClasspath
52+
useJUnitPlatform()
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.dataprepper.plugins.source.sqs.common;
6+
7+
import io.micrometer.core.instrument.MeterRegistry;
8+
import io.micrometer.core.instrument.Metrics;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
11+
import org.opensearch.dataprepper.core.meter.EMFLoggingMeterRegistry;
12+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
13+
import software.amazon.awssdk.regions.Region;
14+
import software.amazon.awssdk.services.sqs.SqsClient;
15+
import software.amazon.awssdk.services.sqs.model.ListQueuesRequest;
16+
17+
import static org.junit.jupiter.api.Assertions.assertTrue;
18+
19+
class SqsMetricsIT {
20+
21+
private MeterRegistry meterRegistry;
22+
23+
@BeforeEach
24+
void setUp() {
25+
meterRegistry = new EMFLoggingMeterRegistry();
26+
Metrics.globalRegistry.clear();
27+
Metrics.addRegistry(meterRegistry);
28+
}
29+
30+
@Test
31+
void testSqsClientFactoryGeneratesMetrics() throws InterruptedException {
32+
final SqsClient sqsClient = SqsClientFactory.createSqsClient(
33+
Region.US_EAST_1,
34+
DefaultCredentialsProvider.create()
35+
);
36+
37+
try {
38+
sqsClient.listQueues(ListQueuesRequest.builder().build());
39+
} catch (Exception ignored) {
40+
}
41+
42+
// Wait for EMF registry to publish metrics
43+
Thread.sleep(2000);
44+
45+
assertTrue(meterRegistry.getMeters().stream()
46+
.anyMatch(meter -> meter.getId().getName().startsWith("aws.sqs.")),
47+
"Expected SQS metrics to be generated");
48+
49+
sqsClient.close();
50+
}
51+
}

data-prepper-plugins/sqs-common/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/common/SqsClientFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
package org.opensearch.dataprepper.plugins.source.sqs.common;
1212

13+
import org.opensearch.dataprepper.metrics.PluginMetrics;
14+
import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher;
1315
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
1416
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
1517
import software.amazon.awssdk.core.retry.RetryPolicy;
@@ -28,11 +30,14 @@ public static SqsClient createSqsClient(
2830
final Region region,
2931
final AwsCredentialsProvider credentialsProvider) {
3032

33+
final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws");
34+
3135
return SqsClient.builder()
3236
.region(region)
3337
.credentialsProvider(credentialsProvider)
3438
.overrideConfiguration(ClientOverrideConfiguration.builder()
3539
.retryPolicy(RetryPolicy.builder().numRetries(5).build())
40+
.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics))
3641
.build())
3742
.build();
3843
}

0 commit comments

Comments
 (0)