Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.s3;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.core.meter.EMFLoggingMeterRegistry;
import org.opensearch.dataprepper.plugins.source.s3.configuration.AwsAuthenticationOptions;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ListBucketsRequest;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class S3MetricsIT {

private MeterRegistry meterRegistry;

@BeforeEach
void setUp() {
meterRegistry = new EMFLoggingMeterRegistry();
Metrics.globalRegistry.clear();
Metrics.addRegistry(meterRegistry);
}

@Test
void testS3ClientBuilderFactoryGeneratesMetrics() throws InterruptedException {
final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class);
final AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class);
when(s3SourceConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1);

final S3ClientBuilderFactory factory = new S3ClientBuilderFactory(s3SourceConfig, DefaultCredentialsProvider.create());
final S3Client s3Client = factory.getS3Client();

try {
s3Client.listBuckets(ListBucketsRequest.builder().build());
} catch (Exception ignored) {
}

// Wait for EMF registry to publish metrics
Thread.sleep(2000);

assertTrue(meterRegistry.getMeters().stream()
.anyMatch(meter -> meter.getId().getName().startsWith("aws.s3.")),
"Expected S3 metrics to be generated");

s3Client.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package org.opensearch.dataprepper.plugins.source.s3;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand All @@ -23,24 +25,28 @@ public class S3ClientBuilderFactory {
private final AwsCredentialsProvider credentialsProvider;
private final S3Client s3Client;
private final S3AsyncClient s3AsyncClient;

public S3ClientBuilderFactory(final S3SourceConfig s3SourceConfig, AwsCredentialsProvider credentialsProvider){
this.s3SourceConfig = s3SourceConfig;
this.credentialsProvider = credentialsProvider;
this.s3Client = createS3Client();
this.s3AsyncClient = createS3AsyncClient();
}

/**
* Create a S3Client Object for download the s3 Objects
* @return a S3Client Object
*/
public S3Client createS3Client() {
LOG.info("Creating S3 client");
final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws");
return S3Client.builder()
.region(s3SourceConfig.getAwsAuthenticationOptions().getAwsRegion())
.crossRegionAccessEnabled(true)
.credentialsProvider(credentialsProvider)
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy -> retryPolicy.numRetries(5).build())
.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics))
.build())
.build();
}
Expand All @@ -51,6 +57,7 @@ public S3Client createS3Client() {
*/
public S3AsyncClient createS3AsyncClient() {
LOG.info("Creating S3 Async client");
final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws");
return S3AsyncClient.builder()
.region(s3SourceConfig.getAwsAuthenticationOptions().getAwsRegion())
.crossRegionAccessEnabled(true)
Expand All @@ -60,6 +67,7 @@ public S3AsyncClient createS3AsyncClient() {
.credentialsProvider(credentialsProvider)
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy -> retryPolicy.numRetries(5).build())
.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics))
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.s3;

import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.source.s3.configuration.AwsAuthenticationOptions;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class S3ClientBuilderFactoryTest {

@Test
void testS3ClientBuilderFactoryCreatesClientsWithMicrometerIntegration() {
final StaticCredentialsProvider credentialsProvider =
StaticCredentialsProvider.create(AwsBasicCredentials.create("testKey", "testSecret"));

final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class);
final AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class);
when(s3SourceConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1);

final S3ClientBuilderFactory factory = new S3ClientBuilderFactory(s3SourceConfig, credentialsProvider);

final S3Client s3Client = factory.getS3Client();
assertNotNull(s3Client, "S3Client should not be null");

assertNotNull(factory.getS3AsyncClient(), "S3AsyncClient should not be null");
}
}
25 changes: 25 additions & 0 deletions data-prepper-plugins/sqs-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ plugins {
id 'java'
}

sourceSets {
integrationTest {
java {
compileClasspath += sourceSets.main.output
runtimeClasspath += sourceSets.main.output
}
}
}

configurations {
integrationTestImplementation.extendsFrom implementation
integrationTestRuntimeOnly.extendsFrom runtimeOnly
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:buffer-common')
Expand All @@ -22,7 +36,18 @@ dependencies {
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-plugins:blocking-buffer')
integrationTestImplementation project(':data-prepper-core')
integrationTestImplementation 'io.micrometer:micrometer-core'
integrationTestImplementation 'org.junit.jupiter:junit-jupiter-api'
integrationTestRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
}

test {
useJUnitPlatform()
}

task integrationTest(type: Test) {
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.sqs.common;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.core.meter.EMFLoggingMeterRegistry;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.ListQueuesRequest;

import static org.junit.jupiter.api.Assertions.assertTrue;

class SqsMetricsIT {

private MeterRegistry meterRegistry;

@BeforeEach
void setUp() {
meterRegistry = new EMFLoggingMeterRegistry();
Metrics.globalRegistry.clear();
Metrics.addRegistry(meterRegistry);
}

@Test
void testSqsClientFactoryGeneratesMetrics() throws InterruptedException {
final SqsClient sqsClient = SqsClientFactory.createSqsClient(
Region.US_EAST_1,
DefaultCredentialsProvider.create()
);

try {
sqsClient.listQueues(ListQueuesRequest.builder().build());
} catch (Exception ignored) {
}

// Wait for EMF registry to publish metrics
Thread.sleep(2000);

assertTrue(meterRegistry.getMeters().stream()
.anyMatch(meter -> meter.getId().getName().startsWith("aws.sqs.")),
"Expected SQS metrics to be generated");

sqsClient.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

package org.opensearch.dataprepper.plugins.source.sqs.common;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
Expand All @@ -28,11 +30,14 @@ public static SqsClient createSqsClient(
final Region region,
final AwsCredentialsProvider credentialsProvider) {

final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws");

return SqsClient.builder()
.region(region)
.credentialsProvider(credentialsProvider)
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(5).build())
.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics))
.build())
.build();
}
Expand Down
Loading