Skip to content

Commit 6cbde4a

Browse files
committed
Added throttle metric to S3 input stream
Signed-off-by: Manuel Mangas Zurita <mzurita@amazon.com>
1 parent e53033d commit 6cbde4a

6 files changed

Lines changed: 133 additions & 0 deletions

File tree

data-prepper-plugins/s3-source/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ dependencies {
4343
testImplementation project(':data-prepper-plugins:newline-codecs')
4444
testImplementation project(':data-prepper-plugins:avro-codecs')
4545
testImplementation project(':data-prepper-plugins:in-memory-source-coordination-store')
46+
testImplementation 'io.micrometer:micrometer-registry-prometheus'
4647
testImplementation project(':data-prepper-core')
4748
testImplementation project(':data-prepper-event')
4849
testImplementation project(':data-prepper-plugins:parquet-codecs')
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.source.s3;
7+
8+
import io.micrometer.core.instrument.Metrics;
9+
import io.micrometer.prometheus.PrometheusConfig;
10+
import io.micrometer.prometheus.PrometheusMeterRegistry;
11+
import org.junit.jupiter.api.AfterEach;
12+
import org.junit.jupiter.api.BeforeEach;
13+
import org.junit.jupiter.api.Test;
14+
import org.opensearch.dataprepper.metrics.PluginMetrics;
15+
import org.opensearch.dataprepper.model.buffer.Buffer;
16+
import org.opensearch.dataprepper.model.event.Event;
17+
import org.opensearch.dataprepper.model.record.Record;
18+
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3DataSelection;
19+
import org.opensearch.dataprepper.plugins.source.s3.ownership.BucketOwnerProvider;
20+
import software.amazon.awssdk.regions.Region;
21+
import software.amazon.awssdk.services.s3.S3Client;
22+
23+
import java.time.Duration;
24+
import java.time.Instant;
25+
import java.util.Optional;
26+
import java.util.UUID;
27+
28+
import static org.hamcrest.CoreMatchers.equalTo;
29+
import static org.hamcrest.MatcherAssert.assertThat;
30+
import static org.mockito.Mockito.mock;
31+
32+
class S3MetricsIT {
33+
34+
private static final int TIMEOUT_IN_MILLIS = 200;
35+
36+
private S3Client s3Client;
37+
private String s3Bucket;
38+
private Buffer<Record<Event>> buffer;
39+
private S3ObjectPluginMetrics s3ObjectPluginMetrics;
40+
private BucketOwnerProvider bucketOwnerProvider;
41+
private EventMetadataModifier eventMetadataModifier;
42+
private PrometheusMeterRegistry prometheusMeterRegistry;
43+
44+
@BeforeEach
45+
void setUp() {
46+
prometheusMeterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
47+
Metrics.addRegistry(prometheusMeterRegistry);
48+
49+
s3Bucket = System.getProperty("tests.s3source.bucket");
50+
s3Client = S3Client.builder()
51+
.region(Region.of(System.getProperty("tests.s3source.region")))
52+
.build();
53+
54+
buffer = mock(Buffer.class);
55+
eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY, false);
56+
bucketOwnerProvider = b -> Optional.empty();
57+
58+
PluginMetrics pluginMetrics = PluginMetrics.fromNames("s3-source", "test-pipeline");
59+
s3ObjectPluginMetrics = new S3ObjectPluginMetrics(pluginMetrics);
60+
}
61+
62+
@AfterEach
63+
void tearDown() {
64+
Metrics.removeRegistry(prometheusMeterRegistry);
65+
prometheusMeterRegistry.close();
66+
}
67+
68+
@Test
69+
void testS3WorkerFailureEmitsPrometheusMetrics() {
70+
String nonExistentKey = "non-existent-key/" + UUID.randomUUID() + "_" + Instant.now().toString() + ".json";
71+
S3ObjectReference s3ObjectReference = S3ObjectReference.bucketAndKey(s3Bucket, nonExistentKey).build();
72+
S3ObjectWorker objectWorker = createS3Worker();
73+
74+
try {
75+
objectWorker.processS3Object(s3ObjectReference, S3DataSelection.DATA_AND_METADATA, null, null, null);
76+
} catch (Exception ignored) {
77+
}
78+
79+
String prometheusMetrics = prometheusMeterRegistry.scrape();
80+
81+
assertThat("Failed metric should be incremented", getMetricValue(prometheusMetrics, "test_pipeline_s3_source_s3ObjectsFailed_total"), equalTo(1.0));
82+
assertThat("Succeeded metric should be zero", getMetricValue(prometheusMetrics, "test_pipeline_s3_source_s3ObjectsSucceeded_total"), equalTo(0.0));
83+
assertThat("Throttled metric should be zero", getMetricValue(prometheusMetrics, "test_pipeline_s3_source_s3ObjectsThrottled_total"), equalTo(0.0));
84+
}
85+
86+
private double getMetricValue(String prometheusMetrics, String metricName) {
87+
String[] lines = prometheusMetrics.split("\n");
88+
for (String line : lines) {
89+
if (line.startsWith(metricName + " ")) {
90+
return Double.parseDouble(line.split(" ")[1]);
91+
}
92+
}
93+
return 0.0;
94+
}
95+
96+
private S3ObjectWorker createS3Worker() {
97+
final S3ObjectRequest request = new S3ObjectRequest.Builder(buffer, 100,
98+
Duration.ofMillis(TIMEOUT_IN_MILLIS), s3ObjectPluginMetrics)
99+
.bucketOwnerProvider(bucketOwnerProvider)
100+
.s3Client(s3Client)
101+
.build();
102+
103+
return new S3ObjectWorker(request);
104+
}
105+
}

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3InputStream.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,8 @@ private void recordS3Exception(final S3Exception ex) {
668668
s3ObjectPluginMetrics.getS3ObjectsFailedNotFoundCounter().increment();
669669
} else if (ex.statusCode() == HttpStatusCode.FORBIDDEN) {
670670
s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter().increment();
671+
} else if (ex.isThrottlingException()) {
672+
s3ObjectPluginMetrics.getS3ObjectsThrottledCounter().increment();
671673
}
672674
}
673675

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectPluginMetrics.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ public class S3ObjectPluginMetrics {
2020
static final String S3_OBJECTS_TIME_ELAPSED_METRIC_NAME = "s3ObjectReadTimeElapsed";
2121
static final String S3_OBJECTS_SIZE = "s3ObjectSizeBytes";
2222
static final String S3_OBJECTS_NO_RECORDS_FOUND = "s3ObjectNoRecordsFound";
23+
static final String S3_OBJECTS_THROTTLED_METRIC_NAME = "s3ObjectsThrottled";
2324
private final Counter s3ObjectsFailedCounter;
25+
private final Counter s3ObjectsThrottledCounter;
2426
private final Counter s3ObjectsFailedNotFoundCounter;
2527
private final Counter s3ObjectsFailedAccessDeniedCounter;
2628
private final Counter s3ObjectsSucceededCounter;
@@ -34,6 +36,7 @@ public class S3ObjectPluginMetrics {
3436

3537
public S3ObjectPluginMetrics(final PluginMetrics pluginMetrics){
3638
s3ObjectsFailedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_METRIC_NAME);
39+
s3ObjectsThrottledCounter = pluginMetrics.counter(S3_OBJECTS_THROTTLED_METRIC_NAME);
3740
s3ObjectsFailedNotFoundCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_NOT_FOUND_METRIC_NAME);
3841
s3ObjectsFailedAccessDeniedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_NOT_FOUND_ACCESS_DENIED);
3942
s3ObjectsSucceededCounter = pluginMetrics.counter(S3_OBJECTS_SUCCEEDED_METRIC_NAME);
@@ -76,9 +79,14 @@ public DistributionSummary getS3ObjectSizeProcessedSummary() {
7679
public DistributionSummary getS3ObjectEventsSummary() {
7780
return s3ObjectEventsSummary;
7881
}
82+
7983
public Counter getS3ObjectNoRecordsFound() {
8084
return s3ObjectNoRecordsFound;
8185
}
8286

87+
public Counter getS3ObjectsThrottledCounter() {
88+
return s3ObjectsThrottledCounter;
89+
}
90+
8391
public Counter getS3ObjectsDeleteFailed() { return s3ObjectsDeleteFailed; }
8492
}

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3InputStreamTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class S3InputStreamTest {
6060
private DistributionSummary s3ObjectSizeProcessedSummary;
6161
private Counter s3ObjectsFailedNotFoundCounter;
6262
private Counter s3ObjectsFailedAccessDeniedCounter;
63+
private Counter s3ObjectsThrottledCounter;
6364
private String bucketName;
6465
private String key;
6566

@@ -68,6 +69,7 @@ void setUp() {
6869
s3ObjectSizeProcessedSummary = mock(DistributionSummary.class);
6970
s3ObjectsFailedNotFoundCounter = mock(Counter.class);
7071
s3ObjectsFailedAccessDeniedCounter = mock(Counter.class);
72+
s3ObjectsThrottledCounter = mock(Counter.class);
7173

7274
bucketName = UUID.randomUUID().toString();
7375
key = UUID.randomUUID().toString();
@@ -77,6 +79,7 @@ void setUp() {
7779
when(s3ObjectPluginMetrics.getS3ObjectSizeProcessedSummary()).thenReturn(s3ObjectSizeProcessedSummary);
7880
when(s3ObjectPluginMetrics.getS3ObjectsFailedNotFoundCounter()).thenReturn(s3ObjectsFailedNotFoundCounter);
7981
when(s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter()).thenReturn(s3ObjectsFailedAccessDeniedCounter);
82+
when(s3ObjectPluginMetrics.getS3ObjectsThrottledCounter()).thenReturn(s3ObjectsThrottledCounter);
8083
}
8184

8285
private S3InputStream createObjectUnderTest() {
@@ -561,6 +564,19 @@ void testS3ObjectsFailedAccessDeniedCounter() {
561564
verify(s3ObjectsFailedAccessDeniedCounter).increment();
562565
}
563566

567+
@Test
568+
void testS3ObjectsThrottledCounter() {
569+
S3Exception throttledException = mock(S3Exception.class);
570+
when(throttledException.isThrottlingException()).thenReturn(true);
571+
572+
when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenThrow(throttledException);
573+
574+
final S3InputStream s3InputStream = createObjectUnderTest();
575+
assertThrows(IOException.class, () -> s3InputStream.read()); // Force opening the stream
576+
577+
verify(s3ObjectsThrottledCounter).increment();
578+
}
579+
564580
private static Stream<Class<? extends Throwable>> retryableExceptions() {
565581
return S3InputStream.RETRYABLE_EXCEPTIONS.stream();
566582
}

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectPluginMetricsTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,6 @@ public void s3ObjectPluginMetricsTest(){
4545
assertThat(metrics.getS3ObjectsSucceededCounter(),sameInstance(counter));
4646
assertThat(metrics.getS3ObjectsFailedAccessDeniedCounter(),sameInstance(counter));
4747
assertThat(metrics.getS3ObjectsFailedNotFoundCounter(),sameInstance(counter));
48+
assertThat(metrics.getS3ObjectsThrottledCounter(),sameInstance(counter));
4849
}
4950
}

0 commit comments

Comments
 (0)