Skip to content

Commit 975a52a

Browse files
committed
Added metrics to SQS notification queue for S3 source
Signed-off-by: Manuel Mangas Zurita <mzurita@amazon.com>
1 parent 1b4979c commit 975a52a

3 files changed

Lines changed: 89 additions & 80 deletions

File tree

  • data-prepper-plugins
    • s3-source/src
    • sqs-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/sqs

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@
3333
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
3434
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
3535
import software.amazon.awssdk.services.sqs.model.SqsException;
36+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
37+
import software.amazon.awssdk.services.sqs.model.KmsAccessDeniedException;
38+
import software.amazon.awssdk.services.sqs.model.KmsNotFoundException;
39+
import software.amazon.awssdk.services.sqs.model.KmsThrottledException;
40+
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
3641
import software.amazon.awssdk.services.sts.model.StsException;
3742

3843
import java.time.Duration;
@@ -58,6 +63,9 @@ public class SqsWorker implements Runnable {
5863
static final String SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangedCount";
5964
static final String SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangeFailedCount";
6065
static final String ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME = "acknowledgementSetCallbackCounter";
66+
static final String SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME = "sqsMessagesAccessDenied";
67+
static final String SQS_MESSAGE_THROTTLED_METRIC_NAME = "sqsMessagesThrottled";
68+
static final String SQS_QUEUE_NOT_FOUND_METRIC_NAME = "sqsQueueNotFound";
6169

6270
private final S3SourceConfig s3SourceConfig;
6371
private final SqsClient sqsClient;
@@ -74,6 +82,9 @@ public class SqsWorker implements Runnable {
7482
private final Counter acknowledgementSetCallbackCounter;
7583
private final Counter sqsVisibilityTimeoutChangedCount;
7684
private final Counter sqsVisibilityTimeoutChangeFailedCount;
85+
private final Counter sqsMessageAccessDeniedCounter;
86+
private final Counter sqsMessageThrottledCounter;
87+
private final Counter sqsQueueNotFoundCounter;
7788
private final Timer sqsMessageDelayTimer;
7889
private final Backoff standardBackoff;
7990
private final SqsMessageParser sqsMessageParser;
@@ -111,6 +122,9 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager,
111122
acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME);
112123
sqsVisibilityTimeoutChangedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME);
113124
sqsVisibilityTimeoutChangeFailedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME);
125+
sqsMessageAccessDeniedCounter = pluginMetrics.counter(SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME);
126+
sqsMessageThrottledCounter = pluginMetrics.counter(SQS_MESSAGE_THROTTLED_METRIC_NAME);
127+
sqsQueueNotFoundCounter = pluginMetrics.counter(SQS_QUEUE_NOT_FOUND_METRIC_NAME);
114128
}
115129

116130
@Override
@@ -164,6 +178,7 @@ private List<Message> getMessagesFromSqs() {
164178
return messages;
165179
} catch (final SqsException | StsException e) {
166180
LOG.error("Error reading from SQS: {}. Retrying with exponential backoff.", e.getMessage());
181+
recordSqsException(e);
167182
sqsReceiveMessagesFailedCounter.increment();
168183
applyBackoff();
169184
return Collections.emptyList();
@@ -452,4 +467,21 @@ private int getApproximateReceiveCount(final Message message) {
452467
void stop() {
453468
isStopped = true;
454469
}
470+
471+
private void recordSqsException(final AwsServiceException e) {
472+
// AWS SQS emits some of their exceptions without the matching HTTP code. As we want to generate an aggregate version of
473+
// these exceptions, we have to explicitly catch the type alongside the status code for the ones that leverage the status
474+
// code (i.e. InvalidAddressException)
475+
if (e.statusCode() == 403 ||
476+
e instanceof KmsAccessDeniedException) {
477+
sqsMessageAccessDeniedCounter.increment();
478+
} else if (e.statusCode() == 404 ||
479+
e instanceof QueueDoesNotExistException ||
480+
e instanceof KmsNotFoundException) {
481+
sqsQueueNotFoundCounter.increment();
482+
} else if (e.isThrottlingException() ||
483+
e instanceof KmsThrottledException) {
484+
sqsMessageThrottledCounter.increment();
485+
}
486+
}
455487
}

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.mockito.InOrder;
2424
import org.mockito.Mock;
2525
import org.mockito.junit.jupiter.MockitoExtension;
26+
import org.mockito.junit.jupiter.MockitoSettings;
27+
import org.mockito.quality.Strictness;
2628
import org.opensearch.dataprepper.metrics.PluginMetrics;
2729
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
2830
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -45,6 +47,11 @@
4547
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
4648
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
4749
import software.amazon.awssdk.services.sqs.model.SqsException;
50+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
51+
import software.amazon.awssdk.services.sqs.model.KmsAccessDeniedException;
52+
import software.amazon.awssdk.services.sqs.model.KmsNotFoundException;
53+
import software.amazon.awssdk.services.sqs.model.KmsThrottledException;
54+
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
4855
import software.amazon.awssdk.services.sts.model.StsException;
4956

5057
import java.io.IOException;
@@ -71,8 +78,10 @@
7178
import static org.junit.jupiter.params.provider.Arguments.arguments;
7279
import static org.mockito.ArgumentMatchers.any;
7380
import static org.mockito.ArgumentMatchers.anyInt;
81+
import static org.mockito.ArgumentMatchers.anyString;
7482
import static org.mockito.ArgumentMatchers.eq;
7583
import static org.mockito.Mockito.inOrder;
84+
import static org.mockito.Mockito.lenient;
7685
import static org.mockito.Mockito.mock;
7786
import static org.mockito.Mockito.never;
7887
import static org.mockito.Mockito.reset;
@@ -866,6 +875,54 @@ private static String createEventBridgeNotification(final Instant startTime) {
866875
"\"reason\":\"PutObject\"}}";
867876
}
868877

878+
@Test
879+
void run_increments_access_denied_metric_on_403_exception() {
880+
final SqsException sqsException = mock(SqsException.class);
881+
when(sqsException.statusCode()).thenReturn(403);
882+
when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(sqsException);
883+
884+
final Counter accessDeniedCounter = mock(Counter.class);
885+
when(pluginMetrics.counter(SqsWorker.SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME)).thenReturn(accessDeniedCounter);
886+
lenient().when(pluginMetrics.counter(anyString())).thenReturn(mock(Counter.class));
887+
888+
final SqsWorker sqsWorker = createObjectUnderTest();
889+
sqsWorker.run();
890+
891+
verify(accessDeniedCounter).increment();
892+
}
893+
894+
@Test
895+
void run_increments_queue_not_found_metric_on_404_exception() {
896+
final SqsException sqsException = mock(SqsException.class);
897+
when(sqsException.statusCode()).thenReturn(404);
898+
when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(sqsException);
899+
900+
final Counter queueNotFoundCounter = mock(Counter.class);
901+
when(pluginMetrics.counter(SqsWorker.SQS_QUEUE_NOT_FOUND_METRIC_NAME)).thenReturn(queueNotFoundCounter);
902+
lenient().when(pluginMetrics.counter(anyString())).thenReturn(mock(Counter.class));
903+
904+
final SqsWorker sqsWorker = createObjectUnderTest();
905+
sqsWorker.run();
906+
907+
verify(queueNotFoundCounter).increment();
908+
}
909+
910+
@Test
911+
void run_increments_throttled_metric_on_throttling_exception() {
912+
final SqsException sqsException = mock(SqsException.class);
913+
when(sqsException.isThrottlingException()).thenReturn(true);
914+
when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(sqsException);
915+
916+
final Counter throttledCounter = mock(Counter.class);
917+
when(pluginMetrics.counter(SqsWorker.SQS_MESSAGE_THROTTLED_METRIC_NAME)).thenReturn(throttledCounter);
918+
lenient().when(pluginMetrics.counter(anyString())).thenReturn(mock(Counter.class));
919+
920+
final SqsWorker sqsWorker = createObjectUnderTest();
921+
sqsWorker.run();
922+
923+
verify(throttledCounter).increment();
924+
}
925+
869926
private static String createSecurityLakeNotification(final Instant startTime) {
870927
return "{\"source\":\"aws.s3\",\"time\":\"" + startTime + "\",\"account\":\"123456789012\",\"region\":\"ca-central-1\"," +
871928
"\"resources\":[\"arn:aws:s3:::example-bucket\"],\"detail\":{\"bucket\":{\"name\":\"example-bucket\"}," +

data-prepper-plugins/sqs-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMetricsIT.java

Lines changed: 0 additions & 80 deletions
This file was deleted.

0 commit comments

Comments
 (0)