Skip to content

Commit 888b9ce

Browse files
committed
Added metrics to SQS notification queue for S3 source
Signed-off-by: Manuel Mangas Zurita <mzurita@amazon.com>
1 parent 1b4979c commit 888b9ce

3 files changed

Lines changed: 92 additions & 80 deletions

File tree

  • data-prepper-plugins
    • s3-source/src
    • sqs-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/sqs

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@
3333
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
3434
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
3535
import software.amazon.awssdk.services.sqs.model.SqsException;
36+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
37+
import software.amazon.awssdk.services.sqs.model.KmsAccessDeniedException;
38+
import software.amazon.awssdk.services.sqs.model.KmsNotFoundException;
39+
import software.amazon.awssdk.services.sqs.model.KmsThrottledException;
40+
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
3641
import software.amazon.awssdk.services.sts.model.StsException;
3742

3843
import java.time.Duration;
@@ -58,6 +63,9 @@ public class SqsWorker implements Runnable {
5863
static final String SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangedCount";
5964
static final String SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangeFailedCount";
6065
static final String ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME = "acknowledgementSetCallbackCounter";
66+
public static final String SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME = "sqsMessagesAccessDenied";
67+
public static final String SQS_MESSAGE_THROTTLED_METRIC_NAME = "sqsMessagesThrottled";
68+
public static final String SQS_QUEUE_NOT_FOUND_METRIC_NAME = "sqsQueueNotFound";
6169

6270
private final S3SourceConfig s3SourceConfig;
6371
private final SqsClient sqsClient;
@@ -74,6 +82,9 @@ public class SqsWorker implements Runnable {
7482
private final Counter acknowledgementSetCallbackCounter;
7583
private final Counter sqsVisibilityTimeoutChangedCount;
7684
private final Counter sqsVisibilityTimeoutChangeFailedCount;
85+
private final Counter sqsMessageAccessDeniedCounter;
86+
private final Counter sqsMessageThrottledCounter;
87+
private final Counter sqsQueueNotFoundCounter;
7788
private final Timer sqsMessageDelayTimer;
7889
private final Backoff standardBackoff;
7990
private final SqsMessageParser sqsMessageParser;
@@ -111,6 +122,9 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager,
111122
acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME);
112123
sqsVisibilityTimeoutChangedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME);
113124
sqsVisibilityTimeoutChangeFailedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME);
125+
sqsMessageAccessDeniedCounter = pluginMetrics.counter(SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME);
126+
sqsMessageThrottledCounter = pluginMetrics.counter(SQS_MESSAGE_THROTTLED_METRIC_NAME);
127+
sqsQueueNotFoundCounter = pluginMetrics.counter(SQS_QUEUE_NOT_FOUND_METRIC_NAME);
114128
}
115129

116130
@Override
@@ -164,6 +178,7 @@ private List<Message> getMessagesFromSqs() {
164178
return messages;
165179
} catch (final SqsException | StsException e) {
166180
LOG.error("Error reading from SQS: {}. Retrying with exponential backoff.", e.getMessage());
181+
recordSqsException(e);
167182
sqsReceiveMessagesFailedCounter.increment();
168183
applyBackoff();
169184
return Collections.emptyList();
@@ -452,4 +467,21 @@ private int getApproximateReceiveCount(final Message message) {
452467
void stop() {
453468
isStopped = true;
454469
}
470+
471+
public void recordSqsException(final AwsServiceException e) {
472+
// AWS SQS emits some of their exceptions without the matching HTTP code. As we want to generate an aggregate version of
473+
// these exceptions, we have to explicitly catch the type alongside the status code for the ones that leverage the status
474+
// code (i.e. InvalidAddressException)
475+
if (e.statusCode() == 403 ||
476+
e instanceof KmsAccessDeniedException) {
477+
sqsMessageAccessDeniedCounter.increment();
478+
} else if (e.statusCode() == 404 ||
479+
e instanceof QueueDoesNotExistException ||
480+
e instanceof KmsNotFoundException) {
481+
sqsQueueNotFoundCounter.increment();
482+
} else if (e.isThrottlingException() ||
483+
e instanceof KmsThrottledException) {
484+
sqsMessageThrottledCounter.increment();
485+
}
486+
}
455487
}

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.mockito.InOrder;
2424
import org.mockito.Mock;
2525
import org.mockito.junit.jupiter.MockitoExtension;
26+
import org.mockito.junit.jupiter.MockitoSettings;
27+
import org.mockito.quality.Strictness;
2628
import org.opensearch.dataprepper.metrics.PluginMetrics;
2729
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
2830
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -45,6 +47,11 @@
4547
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
4648
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
4749
import software.amazon.awssdk.services.sqs.model.SqsException;
50+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
51+
import software.amazon.awssdk.services.sqs.model.KmsAccessDeniedException;
52+
import software.amazon.awssdk.services.sqs.model.KmsNotFoundException;
53+
import software.amazon.awssdk.services.sqs.model.KmsThrottledException;
54+
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
4855
import software.amazon.awssdk.services.sts.model.StsException;
4956

5057
import java.io.IOException;
@@ -92,6 +99,7 @@
9299
import static org.opensearch.dataprepper.plugins.source.s3.SqsWorker.SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME;
93100

94101
@ExtendWith(MockitoExtension.class)
102+
@MockitoSettings(strictness = Strictness.LENIENT)
95103
class SqsWorkerTest {
96104
private SqsClient sqsClient;
97105
private S3Service s3Service;
@@ -866,6 +874,58 @@ private static String createEventBridgeNotification(final Instant startTime) {
866874
"\"reason\":\"PutObject\"}}";
867875
}
868876

877+
@Test
878+
void testRecordSqsException_KmsThrottledException() {
879+
testRecordSqsException(mock(KmsThrottledException.class), SqsWorker.SQS_MESSAGE_THROTTLED_METRIC_NAME);
880+
}
881+
882+
@Test
883+
void testRecordSqsException_403Status() {
884+
final SqsException sqsException = mock(SqsException.class);
885+
when(sqsException.statusCode()).thenReturn(403);
886+
testRecordSqsException(sqsException, SqsWorker.SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME);
887+
}
888+
889+
@Test
890+
void testRecordSqsException_KmsAccessDeniedException() {
891+
testRecordSqsException(mock(KmsAccessDeniedException.class), SqsWorker.SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME);
892+
}
893+
894+
@Test
895+
void testRecordSqsException_404Status() {
896+
final SqsException sqsException = mock(SqsException.class);
897+
when(sqsException.statusCode()).thenReturn(404);
898+
testRecordSqsException(sqsException, SqsWorker.SQS_QUEUE_NOT_FOUND_METRIC_NAME);
899+
}
900+
901+
@Test
902+
void testRecordSqsException_QueueDoesNotExistException() {
903+
testRecordSqsException(mock(QueueDoesNotExistException.class), SqsWorker.SQS_QUEUE_NOT_FOUND_METRIC_NAME);
904+
}
905+
906+
@Test
907+
void testRecordSqsException_KmsNotFoundException() {
908+
testRecordSqsException(mock(KmsNotFoundException.class), SqsWorker.SQS_QUEUE_NOT_FOUND_METRIC_NAME);
909+
}
910+
911+
@Test
912+
void testRecordSqsException_isThrottlingException() {
913+
final SqsException sqsException = mock(SqsException.class);
914+
when(sqsException.isThrottlingException()).thenReturn(true);
915+
testRecordSqsException(sqsException, SqsWorker.SQS_MESSAGE_THROTTLED_METRIC_NAME);
916+
}
917+
918+
private void testRecordSqsException(AwsServiceException exception, String metricName) {
919+
final Counter expectedCounter = mock(Counter.class);
920+
when(pluginMetrics.counter(metricName)).thenReturn(expectedCounter);
921+
final SqsWorker testWorker = createObjectUnderTest();
922+
923+
testWorker.recordSqsException(exception);
924+
925+
verify(expectedCounter).increment();
926+
verifyNoMoreInteractions(expectedCounter);
927+
}
928+
869929
private static String createSecurityLakeNotification(final Instant startTime) {
870930
return "{\"source\":\"aws.s3\",\"time\":\"" + startTime + "\",\"account\":\"123456789012\",\"region\":\"ca-central-1\"," +
871931
"\"resources\":[\"arn:aws:s3:::example-bucket\"],\"detail\":{\"bucket\":{\"name\":\"example-bucket\"}," +

data-prepper-plugins/sqs-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMetricsIT.java

Lines changed: 0 additions & 80 deletions
This file was deleted.

0 commit comments

Comments
 (0)