diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3MetricsIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3MetricsIT.java new file mode 100644 index 0000000000..34ff17052f --- /dev/null +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3MetricsIT.java @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.s3; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.core.meter.EMFLoggingMeterRegistry; +import org.opensearch.dataprepper.plugins.source.s3.configuration.AwsAuthenticationOptions; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListBucketsRequest; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class S3MetricsIT { + + private MeterRegistry meterRegistry; + + @BeforeEach + void setUp() { + meterRegistry = new EMFLoggingMeterRegistry(); + Metrics.globalRegistry.clear(); + Metrics.addRegistry(meterRegistry); + } + + @Test + void testS3ClientBuilderFactoryGeneratesMetrics() throws InterruptedException { + final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class); + final AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + when(s3SourceConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); + + final S3ClientBuilderFactory factory = new S3ClientBuilderFactory(s3SourceConfig, DefaultCredentialsProvider.create()); + final S3Client s3Client = factory.getS3Client(); + + try { + s3Client.listBuckets(ListBucketsRequest.builder().build()); + } catch (Exception ignored) { + } + + // Wait for EMF registry to publish metrics + Thread.sleep(2000); + + assertTrue(meterRegistry.getMeters().stream() + .anyMatch(meter -> meter.getId().getName().startsWith("aws.s3.")), + "Expected S3 metrics to be generated"); + + s3Client.close(); + } +} diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ClientBuilderFactory.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ClientBuilderFactory.java index 3974196018..01e0be44d5 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ClientBuilderFactory.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ClientBuilderFactory.java @@ -4,6 +4,8 @@ */ package org.opensearch.dataprepper.plugins.source.s3; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -23,24 +25,28 @@ public class S3ClientBuilderFactory { private final AwsCredentialsProvider credentialsProvider; private final S3Client s3Client; private final S3AsyncClient s3AsyncClient; + public S3ClientBuilderFactory(final S3SourceConfig s3SourceConfig, AwsCredentialsProvider credentialsProvider){ this.s3SourceConfig = s3SourceConfig; this.credentialsProvider = credentialsProvider; this.s3Client = createS3Client(); this.s3AsyncClient = createS3AsyncClient(); } + /** * Create a S3Client Object for download the s3 Objects * @return a S3Client Object */ public S3Client createS3Client() { LOG.info("Creating S3 client"); + final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws"); return S3Client.builder() .region(s3SourceConfig.getAwsAuthenticationOptions().getAwsRegion()) .crossRegionAccessEnabled(true) .credentialsProvider(credentialsProvider) .overrideConfiguration(ClientOverrideConfiguration.builder() .retryPolicy(retryPolicy -> retryPolicy.numRetries(5).build()) + .addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics)) .build()) .build(); } @@ -51,6 +57,7 @@ public S3Client createS3Client() { */ public S3AsyncClient createS3AsyncClient() { LOG.info("Creating S3 Async client"); + final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws"); return S3AsyncClient.builder() .region(s3SourceConfig.getAwsAuthenticationOptions().getAwsRegion()) .crossRegionAccessEnabled(true) @@ -60,6 +67,7 @@ public S3AsyncClient createS3AsyncClient() { .credentialsProvider(credentialsProvider) .overrideConfiguration(ClientOverrideConfiguration.builder() .retryPolicy(retryPolicy -> retryPolicy.numRetries(5).build()) + .addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics)) .build()) .build(); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ClientBuilderFactoryTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ClientBuilderFactoryTest.java new file mode 100644 index 0000000000..42c092032f --- /dev/null +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ClientBuilderFactoryTest.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.s3; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.s3.configuration.AwsAuthenticationOptions; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class S3ClientBuilderFactoryTest { + + @Test + void testS3ClientBuilderFactoryCreatesClientsWithMicrometerIntegration() { + final StaticCredentialsProvider credentialsProvider = + StaticCredentialsProvider.create(AwsBasicCredentials.create("testKey", "testSecret")); + + final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class); + final AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + when(s3SourceConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); + + final S3ClientBuilderFactory factory = new S3ClientBuilderFactory(s3SourceConfig, credentialsProvider); + + final S3Client s3Client = factory.getS3Client(); + assertNotNull(s3Client, "S3Client should not be null"); + + assertNotNull(factory.getS3AsyncClient(), "S3AsyncClient should not be null"); + } +} diff --git a/data-prepper-plugins/sqs-common/build.gradle b/data-prepper-plugins/sqs-common/build.gradle index b4ffbc8e5e..e6c75d5795 100644 --- a/data-prepper-plugins/sqs-common/build.gradle +++ b/data-prepper-plugins/sqs-common/build.gradle @@ -7,6 +7,20 @@ plugins { id 'java' } +sourceSets { + integrationTest { + java { + compileClasspath += sourceSets.main.output + runtimeClasspath += sourceSets.main.output + } + } +} + +configurations { + integrationTestImplementation.extendsFrom implementation + integrationTestRuntimeOnly.extendsFrom runtimeOnly +} + dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:buffer-common') @@ -22,7 +36,18 @@ dependencies { implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final' testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' testImplementation project(':data-prepper-plugins:blocking-buffer') + integrationTestImplementation project(':data-prepper-core') + integrationTestImplementation 'io.micrometer:micrometer-core' + integrationTestImplementation 'org.junit.jupiter:junit-jupiter-api' + integrationTestRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' } + test { useJUnitPlatform() } + +task integrationTest(type: Test) { + testClassesDirs = sourceSets.integrationTest.output.classesDirs + classpath = sourceSets.integrationTest.runtimeClasspath + useJUnitPlatform() +} diff --git a/data-prepper-plugins/sqs-common/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/sqs/common/SqsMetricsIT.java b/data-prepper-plugins/sqs-common/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/sqs/common/SqsMetricsIT.java new file mode 100644 index 0000000000..2851ae6354 --- /dev/null +++ b/data-prepper-plugins/sqs-common/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/sqs/common/SqsMetricsIT.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.sqs.common; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.core.meter.EMFLoggingMeterRegistry; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.ListQueuesRequest; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +class SqsMetricsIT { + + private MeterRegistry meterRegistry; + + @BeforeEach + void setUp() { + meterRegistry = new EMFLoggingMeterRegistry(); + Metrics.globalRegistry.clear(); + Metrics.addRegistry(meterRegistry); + } + + @Test + void testSqsClientFactoryGeneratesMetrics() throws InterruptedException { + final SqsClient sqsClient = SqsClientFactory.createSqsClient( + Region.US_EAST_1, + DefaultCredentialsProvider.create() + ); + + try { + sqsClient.listQueues(ListQueuesRequest.builder().build()); + } catch (Exception ignored) { + } + + // Wait for EMF registry to publish metrics + Thread.sleep(2000); + + assertTrue(meterRegistry.getMeters().stream() + .anyMatch(meter -> meter.getId().getName().startsWith("aws.sqs.")), + "Expected SQS metrics to be generated"); + + sqsClient.close(); + } +} diff --git a/data-prepper-plugins/sqs-common/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/common/SqsClientFactory.java b/data-prepper-plugins/sqs-common/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/common/SqsClientFactory.java index 8754d87749..60c09c6fed 100644 --- a/data-prepper-plugins/sqs-common/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/common/SqsClientFactory.java +++ b/data-prepper-plugins/sqs-common/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/common/SqsClientFactory.java @@ -10,6 +10,8 @@ package org.opensearch.dataprepper.plugins.source.sqs.common; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; @@ -28,11 +30,14 @@ public static SqsClient createSqsClient( final Region region, final AwsCredentialsProvider credentialsProvider) { + final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws"); + return SqsClient.builder() .region(region) .credentialsProvider(credentialsProvider) .overrideConfiguration(ClientOverrideConfiguration.builder() .retryPolicy(RetryPolicy.builder().numRetries(5).build()) + .addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics)) .build()) .build(); }