Skip to content

Commit 2b985a4

Browse files
committed
Use real CloudWatchMeterRegistry for integration tests
- Replaced EMFLoggingMeterRegistry with CloudWatchMeterRegistry from micrometer-cloudwatch2 - Uses the same CloudWatch publisher that Data Prepper uses in production - Metrics will be sent directly to CloudWatch Metrics in DataPrepper namespace - Tests demonstrate real CloudWatch integration with proper AWS SDK metrics Signed-off-by: Manuel Mangas Zurita <mzurita@amazon.com>
1 parent d90d510 commit 2b985a4

4 files changed

Lines changed: 76 additions & 20 deletions

File tree

data-prepper-plugins/s3-source/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ dependencies {
3434
implementation 'org.apache.httpcomponents:httpcore:4.4.16'
3535
testImplementation libs.commons.lang3
3636
testImplementation 'org.wiremock:wiremock:3.13.1'
37-
testImplementation 'software.amazon.awssdk:cloudwatchlogs'
37+
testImplementation 'io.micrometer:micrometer-registry-cloudwatch2'
38+
testImplementation 'software.amazon.awssdk:cloudwatch'
3839
testImplementation 'org.eclipse.jetty:jetty-bom:11.0.25'
3940
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
4041
testImplementation testLibs.junit.vintage

data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3MetricsIT.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,22 @@
44
*/
55
package org.opensearch.dataprepper.plugins.source.s3;
66

7+
import io.micrometer.cloudwatch2.CloudWatchConfig;
8+
import io.micrometer.cloudwatch2.CloudWatchMeterRegistry;
9+
import io.micrometer.core.instrument.Clock;
710
import io.micrometer.core.instrument.MeterRegistry;
811
import io.micrometer.core.instrument.Metrics;
912
import org.junit.jupiter.api.BeforeEach;
1013
import org.junit.jupiter.api.Test;
11-
import org.opensearch.dataprepper.core.meter.EMFLoggingMeterRegistry;
1214
import org.opensearch.dataprepper.plugins.source.s3.configuration.AwsAuthenticationOptions;
1315
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
1416
import software.amazon.awssdk.regions.Region;
17+
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
1518
import software.amazon.awssdk.services.s3.S3Client;
1619
import software.amazon.awssdk.services.s3.model.ListBucketsRequest;
1720

21+
import java.time.Duration;
22+
1823
import static org.junit.jupiter.api.Assertions.assertTrue;
1924
import static org.mockito.Mockito.mock;
2025
import static org.mockito.Mockito.when;
@@ -25,13 +30,35 @@ class S3MetricsIT {
2530

2631
@BeforeEach
2732
void setUp() {
28-
meterRegistry = new EMFLoggingMeterRegistry();
33+
final CloudWatchAsyncClient cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
34+
.region(Region.US_EAST_1)
35+
.credentialsProvider(DefaultCredentialsProvider.create())
36+
.build();
37+
38+
final CloudWatchConfig cloudWatchConfig = new CloudWatchConfig() {
39+
@Override
40+
public String get(String key) {
41+
return null;
42+
}
43+
44+
@Override
45+
public String namespace() {
46+
return "DataPrepper";
47+
}
48+
49+
@Override
50+
public Duration step() {
51+
return Duration.ofSeconds(60);
52+
}
53+
};
54+
55+
meterRegistry = new CloudWatchMeterRegistry(cloudWatchConfig, Clock.SYSTEM, cloudWatchAsyncClient);
2956
Metrics.globalRegistry.clear();
3057
Metrics.addRegistry(meterRegistry);
3158
}
3259

3360
@Test
34-
void testS3ClientBuilderFactoryGeneratesEMFMetrics() throws InterruptedException {
61+
void testS3ClientBuilderFactoryGeneratesCloudWatchMetrics() throws InterruptedException {
3562
final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class);
3663
final AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class);
3764
when(s3SourceConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
@@ -45,24 +72,24 @@ void testS3ClientBuilderFactoryGeneratesEMFMetrics() throws InterruptedException
4572
} catch (Exception ignored) {
4673
}
4774

48-
// Wait for EMF registry to process metrics
75+
// Wait for CloudWatch registry to process metrics
4976
Thread.sleep(3000);
5077

51-
// Verify AWS SDK metrics were generated in EMF format
78+
// Verify AWS SDK metrics were generated for CloudWatch
5279
boolean hasAwsSdkMetrics = meterRegistry.getMeters().stream()
5380
.anyMatch(meter -> meter.getId().getName().startsWith("aws.sdk."));
5481

55-
System.out.println("=== EMF Metrics Generated for CloudWatch ===");
82+
System.out.println("=== CloudWatch Metrics Generated ===");
5683
System.out.println("Total metrics: " + meterRegistry.getMeters().size());
5784
meterRegistry.getMeters().stream()
5885
.filter(meter -> meter.getId().getName().startsWith("aws.sdk."))
5986
.limit(10)
60-
.forEach(meter -> System.out.println("EMF Metric: " + meter.getId().getName() +
87+
.forEach(meter -> System.out.println("CloudWatch Metric: " + meter.getId().getName() +
6188
" Tags: " + meter.getId().getTags()));
62-
System.out.println("=== These metrics will appear in CloudWatch ===");
89+
System.out.println("=== These metrics will be sent to CloudWatch ===");
6390

6491
assertTrue(hasAwsSdkMetrics,
65-
"Expected AWS SDK metrics to be generated in EMF format for CloudWatch");
92+
"Expected AWS SDK metrics to be generated for CloudWatch");
6693

6794
s3Client.close();
6895
}

data-prepper-plugins/sqs-common/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ dependencies {
3838
testImplementation project(':data-prepper-plugins:blocking-buffer')
3939
integrationTestImplementation project(':data-prepper-core')
4040
integrationTestImplementation 'io.micrometer:micrometer-core'
41-
integrationTestImplementation 'software.amazon.awssdk:cloudwatchlogs'
41+
integrationTestImplementation 'io.micrometer:micrometer-registry-cloudwatch2'
42+
integrationTestImplementation 'software.amazon.awssdk:cloudwatch'
4243
integrationTestImplementation 'org.junit.jupiter:junit-jupiter-api'
4344
integrationTestRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
4445
}

data-prepper-plugins/sqs-common/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/sqs/common/SqsMetricsIT.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,21 @@
44
*/
55
package org.opensearch.dataprepper.plugins.source.sqs.common;
66

7+
import io.micrometer.cloudwatch2.CloudWatchConfig;
8+
import io.micrometer.cloudwatch2.CloudWatchMeterRegistry;
9+
import io.micrometer.core.instrument.Clock;
710
import io.micrometer.core.instrument.MeterRegistry;
811
import io.micrometer.core.instrument.Metrics;
912
import org.junit.jupiter.api.BeforeEach;
1013
import org.junit.jupiter.api.Test;
11-
import org.opensearch.dataprepper.core.meter.EMFLoggingMeterRegistry;
1214
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
1315
import software.amazon.awssdk.regions.Region;
16+
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
1417
import software.amazon.awssdk.services.sqs.SqsClient;
1518
import software.amazon.awssdk.services.sqs.model.ListQueuesRequest;
1619

20+
import java.time.Duration;
21+
1722
import static org.junit.jupiter.api.Assertions.assertTrue;
1823

1924
class SqsMetricsIT {
@@ -22,13 +27,35 @@ class SqsMetricsIT {
2227

2328
@BeforeEach
2429
void setUp() {
25-
meterRegistry = new EMFLoggingMeterRegistry();
30+
final CloudWatchAsyncClient cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
31+
.region(Region.US_EAST_1)
32+
.credentialsProvider(DefaultCredentialsProvider.create())
33+
.build();
34+
35+
final CloudWatchConfig cloudWatchConfig = new CloudWatchConfig() {
36+
@Override
37+
public String get(String key) {
38+
return null;
39+
}
40+
41+
@Override
42+
public String namespace() {
43+
return "DataPrepper";
44+
}
45+
46+
@Override
47+
public Duration step() {
48+
return Duration.ofSeconds(60);
49+
}
50+
};
51+
52+
meterRegistry = new CloudWatchMeterRegistry(cloudWatchConfig, Clock.SYSTEM, cloudWatchAsyncClient);
2653
Metrics.globalRegistry.clear();
2754
Metrics.addRegistry(meterRegistry);
2855
}
2956

3057
@Test
31-
void testSqsClientFactoryGeneratesEMFMetrics() throws InterruptedException {
58+
void testSqsClientFactoryGeneratesCloudWatchMetrics() throws InterruptedException {
3259
final SqsClient sqsClient = SqsClientFactory.createSqsClient(
3360
Region.US_EAST_1,
3461
DefaultCredentialsProvider.create()
@@ -40,24 +67,24 @@ void testSqsClientFactoryGeneratesEMFMetrics() throws InterruptedException {
4067
} catch (Exception ignored) {
4168
}
4269

43-
// Wait for EMF registry to process metrics
70+
// Wait for CloudWatch registry to process metrics
4471
Thread.sleep(3000);
4572

46-
// Verify AWS SDK metrics were generated in EMF format
73+
// Verify AWS SDK metrics were generated for CloudWatch
4774
boolean hasAwsSdkMetrics = meterRegistry.getMeters().stream()
4875
.anyMatch(meter -> meter.getId().getName().startsWith("aws.sdk."));
4976

50-
System.out.println("=== EMF Metrics Generated for CloudWatch ===");
77+
System.out.println("=== CloudWatch Metrics Generated ===");
5178
System.out.println("Total metrics: " + meterRegistry.getMeters().size());
5279
meterRegistry.getMeters().stream()
5380
.filter(meter -> meter.getId().getName().startsWith("aws.sdk."))
5481
.limit(10)
55-
.forEach(meter -> System.out.println("EMF Metric: " + meter.getId().getName() +
82+
.forEach(meter -> System.out.println("CloudWatch Metric: " + meter.getId().getName() +
5683
" Tags: " + meter.getId().getTags()));
57-
System.out.println("=== These metrics will appear in CloudWatch ===");
84+
System.out.println("=== These metrics will be sent to CloudWatch ===");
5885

5986
assertTrue(hasAwsSdkMetrics,
60-
"Expected AWS SDK metrics to be generated in EMF format for CloudWatch");
87+
"Expected AWS SDK metrics to be generated for CloudWatch");
6188

6289
sqsClient.close();
6390
}

0 commit comments

Comments
 (0)