Skip to content

Commit f266312

Browse files
authored
Added API metrics to SQS common worker (#6248)
Signed-off-by: Manuel Mangas Zurita <mzurita@amazon.com>
1 parent 68d99dd commit f266312

4 files changed

Lines changed: 199 additions & 0 deletions

File tree

data-prepper-plugins/sqs-common/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/common/SqsWorkerCommon.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
1818
import org.slf4j.Logger;
1919
import org.slf4j.LoggerFactory;
20+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
2021
import software.amazon.awssdk.core.exception.SdkException;
2122
import software.amazon.awssdk.services.sqs.SqsClient;
2223
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
2324
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
2425
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
2526
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
27+
import software.amazon.awssdk.services.sqs.model.KmsAccessDeniedException;
28+
import software.amazon.awssdk.services.sqs.model.KmsNotFoundException;
29+
import software.amazon.awssdk.services.sqs.model.KmsThrottledException;
2630
import software.amazon.awssdk.services.sqs.model.Message;
31+
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
2732
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
2833
import software.amazon.awssdk.services.sqs.model.SqsException;
2934
import software.amazon.awssdk.services.sts.model.StsException;
@@ -41,6 +46,9 @@ public class SqsWorkerCommon {
4146
public static final String SQS_MESSAGES_DELETE_FAILED_METRIC_NAME = "sqsMessagesDeleteFailed";
4247
public static final String SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangedCount";
4348
public static final String SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangeFailedCount";
49+
public static final String SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME = "sqsMessagesAccessDenied";
50+
public static final String SQS_MESSAGE_THROTTLED_METRIC_NAME = "sqsMessagesThrottled";
51+
public static final String SQS_QUEUE_NOT_FOUND_METRIC_NAME = "sqsQueueNotFound";
4452

4553
private final Backoff standardBackoff;
4654
private final PluginMetrics pluginMetrics;
@@ -54,6 +62,9 @@ public class SqsWorkerCommon {
5462
private final Counter acknowledgementSetCallbackCounter;
5563
private final Counter sqsVisibilityTimeoutChangedCount;
5664
private final Counter sqsVisibilityTimeoutChangeFailedCount;
65+
private final Counter sqsMessageAccessDeniedCounter;
66+
private final Counter sqsMessageThrottledCounter;
67+
private final Counter sqsQueueNotFoundCounter;
5768

5869
public SqsWorkerCommon(final Backoff standardBackoff,
5970
final PluginMetrics pluginMetrics,
@@ -73,6 +84,9 @@ public SqsWorkerCommon(final Backoff standardBackoff,
7384
acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME);
7485
sqsVisibilityTimeoutChangedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME);
7586
sqsVisibilityTimeoutChangeFailedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME);
87+
sqsMessageAccessDeniedCounter = pluginMetrics.counter(SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME);
88+
sqsMessageThrottledCounter = pluginMetrics.counter(SQS_MESSAGE_THROTTLED_METRIC_NAME);
89+
sqsQueueNotFoundCounter = pluginMetrics.counter(SQS_QUEUE_NOT_FOUND_METRIC_NAME);
7690
}
7791

7892
public List<Message> pollSqsMessages(final String queueUrl,
@@ -91,6 +105,7 @@ public List<Message> pollSqsMessages(final String queueUrl,
91105
}
92106
catch (SqsException | StsException e) {
93107
LOG.error("Error reading from SQS: {}. Retrying with exponential backoff.", e.getMessage());
108+
recordSqsException(e);
94109
applyBackoff();
95110
return Collections.emptyList();
96111
}
@@ -212,4 +227,21 @@ public Counter getSqsMessagesDeletedCounter() {
212227
public void stop() {
213228
isStopped = true;
214229
}
230+
231+
public void recordSqsException(final AwsServiceException e) {
232+
// AWS SQS emits some of their exceptions without the matching HTTP code. As we want to generate an aggregate version of
233+
// these exceptions, we have to explicitly catch the type alongside the status code for the ones that leverage the status
234+
// code (i.e. InvalidAddressException)
235+
if (e.statusCode() == 403 ||
236+
e instanceof KmsAccessDeniedException) {
237+
sqsMessageAccessDeniedCounter.increment();
238+
} else if (e.statusCode() == 404 ||
239+
e instanceof QueueDoesNotExistException ||
240+
e instanceof KmsNotFoundException) {
241+
sqsQueueNotFoundCounter.increment();
242+
} else if (e.isThrottlingException() ||
243+
e instanceof KmsThrottledException) {
244+
sqsMessageThrottledCounter.increment();
245+
}
246+
}
215247
}

data-prepper-plugins/sqs-common/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/common/SqsWorkerCommonTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.mockito.ArgumentCaptor;
1818
import org.opensearch.dataprepper.metrics.PluginMetrics;
1919
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
20+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
2021
import software.amazon.awssdk.core.exception.SdkException;
2122
import software.amazon.awssdk.services.sqs.SqsClient;
2223
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
@@ -29,6 +30,10 @@
2930
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
3031
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
3132
import software.amazon.awssdk.services.sqs.model.SqsException;
33+
import software.amazon.awssdk.services.sqs.model.KmsThrottledException;
34+
import software.amazon.awssdk.services.sqs.model.KmsAccessDeniedException;
35+
import software.amazon.awssdk.services.sqs.model.KmsNotFoundException;
36+
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
3237
import java.time.Duration;
3338
import java.util.Arrays;
3439
import java.util.Collections;
@@ -46,6 +51,7 @@
4651
import static org.mockito.Mockito.mock;
4752
import static org.mockito.Mockito.never;
4853
import static org.mockito.Mockito.verify;
54+
import static org.mockito.Mockito.verifyNoMoreInteractions;
4955
import static org.mockito.Mockito.when;
5056

5157

@@ -223,4 +229,56 @@ void testIncreaseVisibilityTimeout_whenException() {
223229

224230
verify(sqsVisibilityTimeoutChangeFailedCount).increment();
225231
}
232+
233+
@Test
234+
void testRecordSqsException_KmsThrottledException() {
235+
testRecordSqsException(mock(KmsThrottledException.class), SqsWorkerCommon.SQS_MESSAGE_THROTTLED_METRIC_NAME);
236+
}
237+
238+
@Test
239+
void testRecordSqsException_403Status() {
240+
final SqsException sqsException = mock(SqsException.class);
241+
when(sqsException.statusCode()).thenReturn(403);
242+
testRecordSqsException(sqsException, SqsWorkerCommon.SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME);
243+
}
244+
245+
@Test
246+
void testRecordSqsException_KmsAccessDeniedException() {
247+
testRecordSqsException(mock(KmsAccessDeniedException.class), SqsWorkerCommon.SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME);
248+
}
249+
250+
@Test
251+
void testRecordSqsException_404Status() {
252+
final SqsException sqsException = mock(SqsException.class);
253+
when(sqsException.statusCode()).thenReturn(404);
254+
testRecordSqsException(sqsException, SqsWorkerCommon.SQS_QUEUE_NOT_FOUND_METRIC_NAME);
255+
}
256+
257+
@Test
258+
void testRecordSqsException_QueueDoesNotExistException() {
259+
testRecordSqsException(mock(QueueDoesNotExistException.class), SqsWorkerCommon.SQS_QUEUE_NOT_FOUND_METRIC_NAME);
260+
}
261+
262+
@Test
263+
void testRecordSqsException_KmsNotFoundException() {
264+
testRecordSqsException(mock(KmsNotFoundException.class), SqsWorkerCommon.SQS_QUEUE_NOT_FOUND_METRIC_NAME);
265+
}
266+
267+
@Test
268+
void testRecordSqsException_isThrottlingException() {
269+
final SqsException sqsException = mock(SqsException.class);
270+
when(sqsException.isThrottlingException()).thenReturn(true);
271+
testRecordSqsException(sqsException, SqsWorkerCommon.SQS_MESSAGE_THROTTLED_METRIC_NAME);
272+
}
273+
274+
private void testRecordSqsException(AwsServiceException exception, String metricName) {
275+
final Counter expectedCounter = mock(Counter.class);
276+
when(pluginMetrics.counter(metricName)).thenReturn(expectedCounter);
277+
final SqsWorkerCommon testWorker = new SqsWorkerCommon(backoff, pluginMetrics, acknowledgementSetManager);
278+
279+
testWorker.recordSqsException(exception);
280+
281+
verify(expectedCounter).increment();
282+
verifyNoMoreInteractions(expectedCounter);
283+
}
226284
}

data-prepper-plugins/sqs-source/build.gradle

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,22 @@ plugins {
77
id 'java'
88
}
99

10+
sourceSets {
11+
integrationTest {
12+
java {
13+
compileClasspath += main.output + test.output
14+
runtimeClasspath += main.output + test.output
15+
srcDir file('src/integrationTest/java')
16+
}
17+
resources.srcDir file('src/integrationTest/resources')
18+
}
19+
}
20+
21+
configurations {
22+
integrationTestImplementation.extendsFrom testImplementation
23+
integrationTestRuntime.extendsFrom testRuntime
24+
}
25+
1026
dependencies {
1127
implementation project(':data-prepper-api')
1228
implementation project(':data-prepper-plugins:buffer-common')
@@ -23,7 +39,20 @@ dependencies {
2339
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'
2440
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
2541
testImplementation project(':data-prepper-plugins:blocking-buffer')
42+
integrationTestImplementation 'io.micrometer:micrometer-registry-prometheus'
2643
}
44+
2745
test {
2846
useJUnitPlatform()
2947
}
48+
49+
task integrationTest(type: Test) {
50+
group = 'verification'
51+
testClassesDirs = sourceSets.integrationTest.output.classesDirs
52+
classpath = sourceSets.integrationTest.runtimeClasspath
53+
useJUnitPlatform()
54+
55+
filter {
56+
includeTestsMatching '*IT'
57+
}
58+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.source.sqs;
7+
8+
import com.linecorp.armeria.client.retry.Backoff;
9+
import io.micrometer.core.instrument.Metrics;
10+
import io.micrometer.prometheus.PrometheusConfig;
11+
import io.micrometer.prometheus.PrometheusMeterRegistry;
12+
import org.junit.jupiter.api.AfterEach;
13+
import org.junit.jupiter.api.BeforeEach;
14+
import org.junit.jupiter.api.Test;
15+
import org.opensearch.dataprepper.metrics.PluginMetrics;
16+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
17+
import org.opensearch.dataprepper.plugins.source.sqs.common.SqsWorkerCommon;
18+
import software.amazon.awssdk.regions.Region;
19+
import software.amazon.awssdk.services.sqs.SqsClient;
20+
21+
import static org.hamcrest.CoreMatchers.equalTo;
22+
import static org.hamcrest.MatcherAssert.assertThat;
23+
import static org.mockito.Mockito.mock;
24+
25+
class SqsMetricsIT {
26+
private SqsClient sqsClient;
27+
private SqsWorkerCommon sqsWorkerCommon;
28+
private PrometheusMeterRegistry prometheusMeterRegistry;
29+
30+
@BeforeEach
31+
void setUp() {
32+
prometheusMeterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
33+
Metrics.addRegistry(prometheusMeterRegistry);
34+
35+
sqsClient = SqsClient.builder()
36+
.region(Region.US_EAST_1)
37+
.build();
38+
39+
final Backoff backoff = mock(Backoff.class);
40+
final AcknowledgementSetManager acknowledgementSetManager = mock(AcknowledgementSetManager.class);
41+
PluginMetrics pluginMetrics = PluginMetrics.fromNames("sqs_source", "test_pipeline");
42+
sqsWorkerCommon = new SqsWorkerCommon(backoff, pluginMetrics, acknowledgementSetManager);
43+
}
44+
45+
@AfterEach
46+
void tearDown() {
47+
Metrics.removeRegistry(prometheusMeterRegistry);
48+
prometheusMeterRegistry.close();
49+
}
50+
51+
@Test
52+
void testSqsWorkerFailureEmitsPrometheusMetrics() {
53+
String nonExistentQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/non-existent-queue";
54+
55+
try {
56+
sqsWorkerCommon.pollSqsMessages(nonExistentQueueUrl, sqsClient, 10, null, null);
57+
} catch (Exception ignored) {
58+
}
59+
60+
String prometheusMetrics = prometheusMeterRegistry.scrape();
61+
62+
// Should increment queue not found metric for non-existent queue
63+
assertThat("Queue not found metric should be incremented",
64+
getMetricValue(prometheusMetrics, "test_pipeline_sqs_source_sqsQueueNotFound_total"), equalTo(1.0));
65+
assertThat("Access denied metric should be zero",
66+
getMetricValue(prometheusMetrics, "test_pipeline_sqs_source_sqsMessagesAccessDenied_total"), equalTo(0.0));
67+
assertThat("Throttled metric should be zero",
68+
getMetricValue(prometheusMetrics, "test_pipeline_sqs_source_sqsMessagesThrottled_total"), equalTo(0.0));
69+
}
70+
71+
private double getMetricValue(String prometheusMetrics, String metricName) {
72+
String[] lines = prometheusMetrics.split("\n");
73+
for (String line : lines) {
74+
if (line.startsWith(metricName + " ")) {
75+
return Double.parseDouble(line.split(" ")[1]);
76+
}
77+
}
78+
return 0.0;
79+
}
80+
}

0 commit comments

Comments
 (0)