From 6cbde4adf9bef324c263cddc23c5f57a6ff57776 Mon Sep 17 00:00:00 2001 From: Manuel Mangas Zurita Date: Wed, 5 Nov 2025 15:15:33 +0000 Subject: [PATCH] Added throttle metric to S3 input stream Signed-off-by: Manuel Mangas Zurita --- data-prepper-plugins/s3-source/build.gradle | 1 + .../plugins/source/s3/S3MetricsIT.java | 105 ++++++++++++++++++ .../plugins/source/s3/S3InputStream.java | 2 + .../source/s3/S3ObjectPluginMetrics.java | 8 ++ .../plugins/source/s3/S3InputStreamTest.java | 16 +++ .../source/s3/S3ObjectPluginMetricsTest.java | 1 + 6 files changed, 133 insertions(+) create mode 100644 data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3MetricsIT.java diff --git a/data-prepper-plugins/s3-source/build.gradle b/data-prepper-plugins/s3-source/build.gradle index 47e29346fe..e226c984e5 100644 --- a/data-prepper-plugins/s3-source/build.gradle +++ b/data-prepper-plugins/s3-source/build.gradle @@ -43,6 +43,7 @@ dependencies { testImplementation project(':data-prepper-plugins:newline-codecs') testImplementation project(':data-prepper-plugins:avro-codecs') testImplementation project(':data-prepper-plugins:in-memory-source-coordination-store') + testImplementation 'io.micrometer:micrometer-registry-prometheus' testImplementation project(':data-prepper-core') testImplementation project(':data-prepper-event') testImplementation project(':data-prepper-plugins:parquet-codecs') diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3MetricsIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3MetricsIT.java new file mode 100644 index 0000000000..2031754a56 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3MetricsIT.java @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.s3; + +import io.micrometer.core.instrument.Metrics; +import io.micrometer.prometheus.PrometheusConfig; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3DataSelection; +import org.opensearch.dataprepper.plugins.source.s3.ownership.BucketOwnerProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; + +class S3MetricsIT { + + private static final int TIMEOUT_IN_MILLIS = 200; + + private S3Client s3Client; + private String s3Bucket; + private Buffer> buffer; + private S3ObjectPluginMetrics s3ObjectPluginMetrics; + private BucketOwnerProvider bucketOwnerProvider; + private EventMetadataModifier eventMetadataModifier; + private PrometheusMeterRegistry prometheusMeterRegistry; + + @BeforeEach + void setUp() { + prometheusMeterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); + Metrics.addRegistry(prometheusMeterRegistry); + + s3Bucket = System.getProperty("tests.s3source.bucket"); + s3Client = S3Client.builder() + .region(Region.of(System.getProperty("tests.s3source.region"))) + .build(); + + buffer = mock(Buffer.class); + eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY, false); + bucketOwnerProvider = b -> Optional.empty(); + + PluginMetrics pluginMetrics = PluginMetrics.fromNames("s3-source", "test-pipeline"); + s3ObjectPluginMetrics = new S3ObjectPluginMetrics(pluginMetrics); + } + + @AfterEach + void tearDown() { + Metrics.removeRegistry(prometheusMeterRegistry); + prometheusMeterRegistry.close(); + } + + @Test + void testS3WorkerFailureEmitsPrometheusMetrics() { + String nonExistentKey = "non-existent-key/" + UUID.randomUUID() + "_" + Instant.now().toString() + ".json"; + S3ObjectReference s3ObjectReference = S3ObjectReference.bucketAndKey(s3Bucket, nonExistentKey).build(); + S3ObjectWorker objectWorker = createS3Worker(); + + try { + objectWorker.processS3Object(s3ObjectReference, S3DataSelection.DATA_AND_METADATA, null, null, null); + } catch (Exception ignored) { + } + + String prometheusMetrics = prometheusMeterRegistry.scrape(); + + assertThat("Failed metric should be incremented", getMetricValue(prometheusMetrics, "test_pipeline_s3_source_s3ObjectsFailed_total"), equalTo(1.0)); + assertThat("Succeeded metric should be zero", getMetricValue(prometheusMetrics, "test_pipeline_s3_source_s3ObjectsSucceeded_total"), equalTo(0.0)); + assertThat("Throttled metric should be zero", getMetricValue(prometheusMetrics, "test_pipeline_s3_source_s3ObjectsThrottled_total"), equalTo(0.0)); + } + + private double getMetricValue(String prometheusMetrics, String metricName) { + String[] lines = prometheusMetrics.split("\n"); + for (String line : lines) { + if (line.startsWith(metricName + " ")) { + return Double.parseDouble(line.split(" ")[1]); + } + } + return 0.0; + } + + private S3ObjectWorker createS3Worker() { + final S3ObjectRequest request = new S3ObjectRequest.Builder(buffer, 100, + Duration.ofMillis(TIMEOUT_IN_MILLIS), s3ObjectPluginMetrics) + .bucketOwnerProvider(bucketOwnerProvider) + .s3Client(s3Client) + .build(); + + return new S3ObjectWorker(request); + } +} diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3InputStream.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3InputStream.java index b51df4571c..ad7ef86799 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3InputStream.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3InputStream.java @@ -668,6 +668,8 @@ private void recordS3Exception(final S3Exception ex) { s3ObjectPluginMetrics.getS3ObjectsFailedNotFoundCounter().increment(); } else if (ex.statusCode() == HttpStatusCode.FORBIDDEN) { s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter().increment(); + } else if (ex.isThrottlingException()) { + s3ObjectPluginMetrics.getS3ObjectsThrottledCounter().increment(); } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectPluginMetrics.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectPluginMetrics.java index e54f95466f..d0c726a7c2 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectPluginMetrics.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectPluginMetrics.java @@ -20,7 +20,9 @@ public class S3ObjectPluginMetrics { static final String S3_OBJECTS_TIME_ELAPSED_METRIC_NAME = "s3ObjectReadTimeElapsed"; static final String S3_OBJECTS_SIZE = "s3ObjectSizeBytes"; static final String S3_OBJECTS_NO_RECORDS_FOUND = "s3ObjectNoRecordsFound"; + static final String S3_OBJECTS_THROTTLED_METRIC_NAME = "s3ObjectsThrottled"; private final Counter s3ObjectsFailedCounter; + private final Counter s3ObjectsThrottledCounter; private final Counter s3ObjectsFailedNotFoundCounter; private final Counter s3ObjectsFailedAccessDeniedCounter; private final Counter s3ObjectsSucceededCounter; @@ -34,6 +36,7 @@ public class S3ObjectPluginMetrics { public S3ObjectPluginMetrics(final PluginMetrics pluginMetrics){ s3ObjectsFailedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_METRIC_NAME); + s3ObjectsThrottledCounter = pluginMetrics.counter(S3_OBJECTS_THROTTLED_METRIC_NAME); s3ObjectsFailedNotFoundCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_NOT_FOUND_METRIC_NAME); s3ObjectsFailedAccessDeniedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_NOT_FOUND_ACCESS_DENIED); s3ObjectsSucceededCounter = pluginMetrics.counter(S3_OBJECTS_SUCCEEDED_METRIC_NAME); @@ -76,9 +79,14 @@ public DistributionSummary getS3ObjectSizeProcessedSummary() { public DistributionSummary getS3ObjectEventsSummary() { return s3ObjectEventsSummary; } + public Counter getS3ObjectNoRecordsFound() { return s3ObjectNoRecordsFound; } + public Counter getS3ObjectsThrottledCounter() { + return s3ObjectsThrottledCounter; + } + public Counter getS3ObjectsDeleteFailed() { return s3ObjectsDeleteFailed; } } \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3InputStreamTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3InputStreamTest.java index 5548242e8f..d42804f073 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3InputStreamTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3InputStreamTest.java @@ -60,6 +60,7 @@ class S3InputStreamTest { private DistributionSummary s3ObjectSizeProcessedSummary; private Counter s3ObjectsFailedNotFoundCounter; private Counter s3ObjectsFailedAccessDeniedCounter; + private Counter s3ObjectsThrottledCounter; private String bucketName; private String key; @@ -68,6 +69,7 @@ void setUp() { s3ObjectSizeProcessedSummary = mock(DistributionSummary.class); s3ObjectsFailedNotFoundCounter = mock(Counter.class); s3ObjectsFailedAccessDeniedCounter = mock(Counter.class); + s3ObjectsThrottledCounter = mock(Counter.class); bucketName = UUID.randomUUID().toString(); key = UUID.randomUUID().toString(); @@ -77,6 +79,7 @@ void setUp() { when(s3ObjectPluginMetrics.getS3ObjectSizeProcessedSummary()).thenReturn(s3ObjectSizeProcessedSummary); when(s3ObjectPluginMetrics.getS3ObjectsFailedNotFoundCounter()).thenReturn(s3ObjectsFailedNotFoundCounter); when(s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter()).thenReturn(s3ObjectsFailedAccessDeniedCounter); + when(s3ObjectPluginMetrics.getS3ObjectsThrottledCounter()).thenReturn(s3ObjectsThrottledCounter); } private S3InputStream createObjectUnderTest() { @@ -561,6 +564,19 @@ void testS3ObjectsFailedAccessDeniedCounter() { verify(s3ObjectsFailedAccessDeniedCounter).increment(); } + @Test + void testS3ObjectsThrottledCounter() { + S3Exception throttledException = mock(S3Exception.class); + when(throttledException.isThrottlingException()).thenReturn(true); + + when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenThrow(throttledException); + + final S3InputStream s3InputStream = createObjectUnderTest(); + assertThrows(IOException.class, () -> s3InputStream.read()); // Force opening the stream + + verify(s3ObjectsThrottledCounter).increment(); + } + private static Stream> retryableExceptions() { return S3InputStream.RETRYABLE_EXCEPTIONS.stream(); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectPluginMetricsTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectPluginMetricsTest.java index 851b89370d..6f887f4aec 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectPluginMetricsTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectPluginMetricsTest.java @@ -45,5 +45,6 @@ public void s3ObjectPluginMetricsTest(){ assertThat(metrics.getS3ObjectsSucceededCounter(),sameInstance(counter)); assertThat(metrics.getS3ObjectsFailedAccessDeniedCounter(),sameInstance(counter)); assertThat(metrics.getS3ObjectsFailedNotFoundCounter(),sameInstance(counter)); + assertThat(metrics.getS3ObjectsThrottledCounter(),sameInstance(counter)); } }