Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/s3-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {
testImplementation project(':data-prepper-plugins:newline-codecs')
testImplementation project(':data-prepper-plugins:avro-codecs')
testImplementation project(':data-prepper-plugins:in-memory-source-coordination-store')
testImplementation 'io.micrometer:micrometer-registry-prometheus'
testImplementation project(':data-prepper-core')
testImplementation project(':data-prepper-event')
testImplementation project(':data-prepper-plugins:parquet-codecs')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.s3;

import io.micrometer.core.instrument.Metrics;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3DataSelection;
import org.opensearch.dataprepper.plugins.source.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;

class S3MetricsIT {

private static final int TIMEOUT_IN_MILLIS = 200;

private S3Client s3Client;
private String s3Bucket;
private Buffer<Record<Event>> buffer;
private S3ObjectPluginMetrics s3ObjectPluginMetrics;
private BucketOwnerProvider bucketOwnerProvider;
private EventMetadataModifier eventMetadataModifier;
private PrometheusMeterRegistry prometheusMeterRegistry;

@BeforeEach
void setUp() {
prometheusMeterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
Metrics.addRegistry(prometheusMeterRegistry);

s3Bucket = System.getProperty("tests.s3source.bucket");
s3Client = S3Client.builder()
.region(Region.of(System.getProperty("tests.s3source.region")))
.build();

buffer = mock(Buffer.class);
eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY, false);
bucketOwnerProvider = b -> Optional.empty();

PluginMetrics pluginMetrics = PluginMetrics.fromNames("s3-source", "test-pipeline");
s3ObjectPluginMetrics = new S3ObjectPluginMetrics(pluginMetrics);
}

@AfterEach
void tearDown() {
Metrics.removeRegistry(prometheusMeterRegistry);
prometheusMeterRegistry.close();
}

@Test
void testS3WorkerFailureEmitsPrometheusMetrics() {
String nonExistentKey = "non-existent-key/" + UUID.randomUUID() + "_" + Instant.now().toString() + ".json";
S3ObjectReference s3ObjectReference = S3ObjectReference.bucketAndKey(s3Bucket, nonExistentKey).build();
S3ObjectWorker objectWorker = createS3Worker();

try {
objectWorker.processS3Object(s3ObjectReference, S3DataSelection.DATA_AND_METADATA, null, null, null);
} catch (Exception ignored) {
}

String prometheusMetrics = prometheusMeterRegistry.scrape();

assertThat("Failed metric should be incremented", getMetricValue(prometheusMetrics, "test_pipeline_s3_source_s3ObjectsFailed_total"), equalTo(1.0));
assertThat("Succeeded metric should be zero", getMetricValue(prometheusMetrics, "test_pipeline_s3_source_s3ObjectsSucceeded_total"), equalTo(0.0));
assertThat("Throttled metric should be zero", getMetricValue(prometheusMetrics, "test_pipeline_s3_source_s3ObjectsThrottled_total"), equalTo(0.0));
}

private double getMetricValue(String prometheusMetrics, String metricName) {
String[] lines = prometheusMetrics.split("\n");
for (String line : lines) {
if (line.startsWith(metricName + " ")) {
return Double.parseDouble(line.split(" ")[1]);
}
}
return 0.0;
}

private S3ObjectWorker createS3Worker() {
final S3ObjectRequest request = new S3ObjectRequest.Builder(buffer, 100,
Duration.ofMillis(TIMEOUT_IN_MILLIS), s3ObjectPluginMetrics)
.bucketOwnerProvider(bucketOwnerProvider)
.s3Client(s3Client)
.build();

return new S3ObjectWorker(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,8 @@ private void recordS3Exception(final S3Exception ex) {
s3ObjectPluginMetrics.getS3ObjectsFailedNotFoundCounter().increment();
} else if (ex.statusCode() == HttpStatusCode.FORBIDDEN) {
s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter().increment();
} else if (ex.isThrottlingException()) {
s3ObjectPluginMetrics.getS3ObjectsThrottledCounter().increment();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ public class S3ObjectPluginMetrics {
static final String S3_OBJECTS_TIME_ELAPSED_METRIC_NAME = "s3ObjectReadTimeElapsed";
static final String S3_OBJECTS_SIZE = "s3ObjectSizeBytes";
static final String S3_OBJECTS_NO_RECORDS_FOUND = "s3ObjectNoRecordsFound";
static final String S3_OBJECTS_THROTTLED_METRIC_NAME = "s3ObjectsThrottled";
private final Counter s3ObjectsFailedCounter;
private final Counter s3ObjectsThrottledCounter;
private final Counter s3ObjectsFailedNotFoundCounter;
private final Counter s3ObjectsFailedAccessDeniedCounter;
private final Counter s3ObjectsSucceededCounter;
Expand All @@ -34,6 +36,7 @@ public class S3ObjectPluginMetrics {

public S3ObjectPluginMetrics(final PluginMetrics pluginMetrics){
s3ObjectsFailedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_METRIC_NAME);
s3ObjectsThrottledCounter = pluginMetrics.counter(S3_OBJECTS_THROTTLED_METRIC_NAME);
s3ObjectsFailedNotFoundCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_NOT_FOUND_METRIC_NAME);
s3ObjectsFailedAccessDeniedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_NOT_FOUND_ACCESS_DENIED);
s3ObjectsSucceededCounter = pluginMetrics.counter(S3_OBJECTS_SUCCEEDED_METRIC_NAME);
Expand Down Expand Up @@ -76,9 +79,14 @@ public DistributionSummary getS3ObjectSizeProcessedSummary() {
public DistributionSummary getS3ObjectEventsSummary() {
return s3ObjectEventsSummary;
}

public Counter getS3ObjectNoRecordsFound() {
return s3ObjectNoRecordsFound;
}

public Counter getS3ObjectsThrottledCounter() {
return s3ObjectsThrottledCounter;
}

public Counter getS3ObjectsDeleteFailed() { return s3ObjectsDeleteFailed; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class S3InputStreamTest {
private DistributionSummary s3ObjectSizeProcessedSummary;
private Counter s3ObjectsFailedNotFoundCounter;
private Counter s3ObjectsFailedAccessDeniedCounter;
private Counter s3ObjectsThrottledCounter;
private String bucketName;
private String key;

Expand All @@ -68,6 +69,7 @@ void setUp() {
s3ObjectSizeProcessedSummary = mock(DistributionSummary.class);
s3ObjectsFailedNotFoundCounter = mock(Counter.class);
s3ObjectsFailedAccessDeniedCounter = mock(Counter.class);
s3ObjectsThrottledCounter = mock(Counter.class);

bucketName = UUID.randomUUID().toString();
key = UUID.randomUUID().toString();
Expand All @@ -77,6 +79,7 @@ void setUp() {
when(s3ObjectPluginMetrics.getS3ObjectSizeProcessedSummary()).thenReturn(s3ObjectSizeProcessedSummary);
when(s3ObjectPluginMetrics.getS3ObjectsFailedNotFoundCounter()).thenReturn(s3ObjectsFailedNotFoundCounter);
when(s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter()).thenReturn(s3ObjectsFailedAccessDeniedCounter);
when(s3ObjectPluginMetrics.getS3ObjectsThrottledCounter()).thenReturn(s3ObjectsThrottledCounter);
}

private S3InputStream createObjectUnderTest() {
Expand Down Expand Up @@ -561,6 +564,19 @@ void testS3ObjectsFailedAccessDeniedCounter() {
verify(s3ObjectsFailedAccessDeniedCounter).increment();
}

@Test
void testS3ObjectsThrottledCounter() {
S3Exception throttledException = mock(S3Exception.class);
when(throttledException.isThrottlingException()).thenReturn(true);

when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenThrow(throttledException);

final S3InputStream s3InputStream = createObjectUnderTest();
assertThrows(IOException.class, () -> s3InputStream.read()); // Force opening the stream

verify(s3ObjectsThrottledCounter).increment();
}

private static Stream<Class<? extends Throwable>> retryableExceptions() {
return S3InputStream.RETRYABLE_EXCEPTIONS.stream();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ public void s3ObjectPluginMetricsTest(){
assertThat(metrics.getS3ObjectsSucceededCounter(),sameInstance(counter));
assertThat(metrics.getS3ObjectsFailedAccessDeniedCounter(),sameInstance(counter));
assertThat(metrics.getS3ObjectsFailedNotFoundCounter(),sameInstance(counter));
assertThat(metrics.getS3ObjectsThrottledCounter(),sameInstance(counter));
}
}
Loading