Skip to content

Commit 0bef3dd

Browse files
committed
Add SQS exception metrics to S3 source
Signed-off-by: Manuel Mangas Zurita <mzurita@amazon.com>
1 parent 1b4979c commit 0bef3dd

5 files changed

Lines changed: 89 additions & 87 deletions

File tree

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@
3333
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
3434
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
3535
import software.amazon.awssdk.services.sqs.model.SqsException;
36+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
37+
import software.amazon.awssdk.services.sqs.model.KmsAccessDeniedException;
38+
import software.amazon.awssdk.services.sqs.model.KmsNotFoundException;
39+
import software.amazon.awssdk.services.sqs.model.KmsThrottledException;
40+
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
3641
import software.amazon.awssdk.services.sts.model.StsException;
3742

3843
import java.time.Duration;
@@ -58,6 +63,9 @@ public class SqsWorker implements Runnable {
5863
static final String SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangedCount";
5964
static final String SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangeFailedCount";
6065
static final String ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME = "acknowledgementSetCallbackCounter";
66+
static final String SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME = "sqsMessagesAccessDenied";
67+
static final String SQS_MESSAGE_THROTTLED_METRIC_NAME = "sqsMessagesThrottled";
68+
static final String SQS_RESOURCE_NOT_FOUND_METRIC_NAME = "sqsResourceNotFound";
6169

6270
private final S3SourceConfig s3SourceConfig;
6371
private final SqsClient sqsClient;
@@ -74,6 +82,9 @@ public class SqsWorker implements Runnable {
7482
private final Counter acknowledgementSetCallbackCounter;
7583
private final Counter sqsVisibilityTimeoutChangedCount;
7684
private final Counter sqsVisibilityTimeoutChangeFailedCount;
85+
private final Counter sqsMessageAccessDeniedCounter;
86+
private final Counter sqsMessageThrottledCounter;
87+
private final Counter sqsResourceNotFoundCounter;
7788
private final Timer sqsMessageDelayTimer;
7889
private final Backoff standardBackoff;
7990
private final SqsMessageParser sqsMessageParser;
@@ -111,6 +122,9 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager,
111122
acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME);
112123
sqsVisibilityTimeoutChangedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME);
113124
sqsVisibilityTimeoutChangeFailedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME);
125+
sqsMessageAccessDeniedCounter = pluginMetrics.counter(SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME);
126+
sqsMessageThrottledCounter = pluginMetrics.counter(SQS_MESSAGE_THROTTLED_METRIC_NAME);
127+
sqsResourceNotFoundCounter = pluginMetrics.counter(SQS_RESOURCE_NOT_FOUND_METRIC_NAME);
114128
}
115129

116130
@Override
@@ -164,6 +178,7 @@ private List<Message> getMessagesFromSqs() {
164178
return messages;
165179
} catch (final SqsException | StsException e) {
166180
LOG.error("Error reading from SQS: {}. Retrying with exponential backoff.", e.getMessage());
181+
recordSqsException(e);
167182
sqsReceiveMessagesFailedCounter.increment();
168183
applyBackoff();
169184
return Collections.emptyList();
@@ -452,4 +467,21 @@ private int getApproximateReceiveCount(final Message message) {
452467
void stop() {
453468
isStopped = true;
454469
}
470+
471+
private void recordSqsException(final AwsServiceException e) {
472+
// AWS SQS emits some of their exceptions without the matching HTTP code. As we want to generate an aggregate version of
473+
// these exceptions, we have to explicitly catch the type alongside the status code for the ones that leverage the status
474+
// code (i.e. InvalidAddressException)
475+
if (e.statusCode() == 403 ||
476+
e instanceof KmsAccessDeniedException) {
477+
sqsMessageAccessDeniedCounter.increment();
478+
} else if (e.statusCode() == 404 ||
479+
e instanceof QueueDoesNotExistException ||
480+
e instanceof KmsNotFoundException) {
481+
sqsResourceNotFoundCounter.increment();
482+
} else if (e.isThrottlingException() ||
483+
e instanceof KmsThrottledException) {
484+
sqsMessageThrottledCounter.increment();
485+
}
486+
}
455487
}

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,10 @@
7171
import static org.junit.jupiter.params.provider.Arguments.arguments;
7272
import static org.mockito.ArgumentMatchers.any;
7373
import static org.mockito.ArgumentMatchers.anyInt;
74+
import static org.mockito.ArgumentMatchers.anyString;
7475
import static org.mockito.ArgumentMatchers.eq;
7576
import static org.mockito.Mockito.inOrder;
77+
import static org.mockito.Mockito.lenient;
7678
import static org.mockito.Mockito.mock;
7779
import static org.mockito.Mockito.never;
7880
import static org.mockito.Mockito.reset;
@@ -833,6 +835,54 @@ void processSqsMessages_should_record_zero_message_delay_when_no_messages_are_fo
833835
verify(sqsMessageDelayTimer).record(Duration.ZERO);
834836
}
835837

838+
@Test
839+
void run_increments_access_denied_metric_on_403_exception() {
840+
final SqsException sqsException = mock(SqsException.class);
841+
when(sqsException.statusCode()).thenReturn(403);
842+
when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(sqsException);
843+
844+
final Counter accessDeniedCounter = mock(Counter.class);
845+
when(pluginMetrics.counter(SqsWorker.SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME)).thenReturn(accessDeniedCounter);
846+
lenient().when(pluginMetrics.counter(anyString())).thenReturn(mock(Counter.class));
847+
848+
final SqsWorker sqsWorker = createObjectUnderTest();
849+
sqsWorker.run();
850+
851+
verify(accessDeniedCounter).increment();
852+
}
853+
854+
@Test
855+
void run_increments_queue_not_found_metric_on_404_exception() {
856+
final SqsException sqsException = mock(SqsException.class);
857+
when(sqsException.statusCode()).thenReturn(404);
858+
when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(sqsException);
859+
860+
final Counter queueNotFoundCounter = mock(Counter.class);
861+
when(pluginMetrics.counter(SqsWorker.SQS_RESOURCE_NOT_FOUND_METRIC_NAME)).thenReturn(queueNotFoundCounter);
862+
lenient().when(pluginMetrics.counter(anyString())).thenReturn(mock(Counter.class));
863+
864+
final SqsWorker sqsWorker = createObjectUnderTest();
865+
sqsWorker.run();
866+
867+
verify(queueNotFoundCounter).increment();
868+
}
869+
870+
@Test
871+
void run_increments_throttled_metric_on_throttling_exception() {
872+
final SqsException sqsException = mock(SqsException.class);
873+
when(sqsException.isThrottlingException()).thenReturn(true);
874+
when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(sqsException);
875+
876+
final Counter throttledCounter = mock(Counter.class);
877+
when(pluginMetrics.counter(SqsWorker.SQS_MESSAGE_THROTTLED_METRIC_NAME)).thenReturn(throttledCounter);
878+
lenient().when(pluginMetrics.counter(anyString())).thenReturn(mock(Counter.class));
879+
880+
final SqsWorker sqsWorker = createObjectUnderTest();
881+
sqsWorker.run();
882+
883+
verify(throttledCounter).increment();
884+
}
885+
836886
private static String createPutNotification(final Instant startTime) {
837887
return createEventNotification("ObjectCreated:Put", startTime);
838888
}

data-prepper-plugins/sqs-common/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/common/SqsWorkerCommon.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class SqsWorkerCommon {
4848
public static final String SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangeFailedCount";
4949
public static final String SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME = "sqsMessagesAccessDenied";
5050
public static final String SQS_MESSAGE_THROTTLED_METRIC_NAME = "sqsMessagesThrottled";
51-
public static final String SQS_QUEUE_NOT_FOUND_METRIC_NAME = "sqsQueueNotFound";
51+
public static final String SQS_RESOURCE_NOT_FOUND_METRIC_NAME = "sqsResourceNotFound";
5252

5353
private final Backoff standardBackoff;
5454
private final PluginMetrics pluginMetrics;
@@ -64,7 +64,7 @@ public class SqsWorkerCommon {
6464
private final Counter sqsVisibilityTimeoutChangeFailedCount;
6565
private final Counter sqsMessageAccessDeniedCounter;
6666
private final Counter sqsMessageThrottledCounter;
67-
private final Counter sqsQueueNotFoundCounter;
67+
private final Counter sqsResourceNotFoundCounter;
6868

6969
public SqsWorkerCommon(final Backoff standardBackoff,
7070
final PluginMetrics pluginMetrics,
@@ -86,7 +86,7 @@ public SqsWorkerCommon(final Backoff standardBackoff,
8686
sqsVisibilityTimeoutChangeFailedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME);
8787
sqsMessageAccessDeniedCounter = pluginMetrics.counter(SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME);
8888
sqsMessageThrottledCounter = pluginMetrics.counter(SQS_MESSAGE_THROTTLED_METRIC_NAME);
89-
sqsQueueNotFoundCounter = pluginMetrics.counter(SQS_QUEUE_NOT_FOUND_METRIC_NAME);
89+
sqsResourceNotFoundCounter = pluginMetrics.counter(SQS_RESOURCE_NOT_FOUND_METRIC_NAME);
9090
}
9191

9292
public List<Message> pollSqsMessages(final String queueUrl,
@@ -238,7 +238,7 @@ public void recordSqsException(final AwsServiceException e) {
238238
} else if (e.statusCode() == 404 ||
239239
e instanceof QueueDoesNotExistException ||
240240
e instanceof KmsNotFoundException) {
241-
sqsQueueNotFoundCounter.increment();
241+
sqsResourceNotFoundCounter.increment();
242242
} else if (e.isThrottlingException() ||
243243
e instanceof KmsThrottledException) {
244244
sqsMessageThrottledCounter.increment();

data-prepper-plugins/sqs-common/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/common/SqsWorkerCommonTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,17 +251,17 @@ void testRecordSqsException_KmsAccessDeniedException() {
251251
void testRecordSqsException_404Status() {
252252
final SqsException sqsException = mock(SqsException.class);
253253
when(sqsException.statusCode()).thenReturn(404);
254-
testRecordSqsException(sqsException, SqsWorkerCommon.SQS_QUEUE_NOT_FOUND_METRIC_NAME);
254+
testRecordSqsException(sqsException, SqsWorkerCommon.SQS_RESOURCE_NOT_FOUND_METRIC_NAME);
255255
}
256256

257257
@Test
258258
void testRecordSqsException_QueueDoesNotExistException() {
259-
testRecordSqsException(mock(QueueDoesNotExistException.class), SqsWorkerCommon.SQS_QUEUE_NOT_FOUND_METRIC_NAME);
259+
testRecordSqsException(mock(QueueDoesNotExistException.class), SqsWorkerCommon.SQS_RESOURCE_NOT_FOUND_METRIC_NAME);
260260
}
261261

262262
@Test
263263
void testRecordSqsException_KmsNotFoundException() {
264-
testRecordSqsException(mock(KmsNotFoundException.class), SqsWorkerCommon.SQS_QUEUE_NOT_FOUND_METRIC_NAME);
264+
testRecordSqsException(mock(KmsNotFoundException.class), SqsWorkerCommon.SQS_RESOURCE_NOT_FOUND_METRIC_NAME);
265265
}
266266

267267
@Test

data-prepper-plugins/sqs-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMetricsIT.java

Lines changed: 0 additions & 80 deletions
This file was deleted.

0 commit comments

Comments
 (0)