-
Notifications
You must be signed in to change notification settings - Fork 149
Expand file tree
/
Copy pathAmazonSQSAsyncMessagingClientWrapper.java
More file actions
394 lines (363 loc) · 16.7 KB
/
AmazonSQSAsyncMessagingClientWrapper.java
File metadata and controls
394 lines (363 loc) · 16.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
package com.amazon.sqs.javamessaging;
import jakarta.jms.InvalidDestinationException;
import jakarta.jms.JMSException;
import jakarta.jms.JMSSecurityException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.awscore.AwsRequest;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
import java.util.Set;
/**
* This is a JMS Wrapper of <code>SqsAsyncClient</code>. This class changes all
* <code>AwsServiceException</code> and <code>SdkException</code> into
* JMSException/JMSSecurityException.
*/
public class AmazonSQSAsyncMessagingClientWrapper implements AmazonSQSMessagingClient {
private static final Logger LOG = LoggerFactory.getLogger(AmazonSQSAsyncMessagingClientWrapper.class);
/**
* List of exceptions that can classified as security. These exceptions
* are not thrown during connection-set-up rather after the service
* calls of the <code>SqsAsyncClient</code>.
*/
private static final Set<String> SECURITY_EXCEPTION_ERROR_CODES = Set.of("MissingClientTokenId",
"InvalidClientTokenId", "MissingAuthenticationToken", "AccessDenied");
private final SqsAsyncClient amazonSQSClient;
private final AwsCredentialsProvider credentialsProvider;
/**
* @param amazonSQSClient The AWS SDK Client for SQS.
* @throws JMSException if the client is null
*/
public AmazonSQSAsyncMessagingClientWrapper(SqsAsyncClient amazonSQSClient) throws JMSException {
this(amazonSQSClient, null);
}
/**
* @param amazonSQSClient The AWS SDK Client for SQS.
* @throws JMSException if the client is null
*/
public AmazonSQSAsyncMessagingClientWrapper(SqsAsyncClient amazonSQSClient, AwsCredentialsProvider credentialsProvider) throws JMSException {
if (amazonSQSClient == null) {
throw new JMSException("Amazon SQS client cannot be null");
}
this.amazonSQSClient = amazonSQSClient;
this.credentialsProvider = credentialsProvider;
}
/**
* If one uses any other AWS SDK operations other than explicitly listed
* here, the exceptions thrown by those operations will not be wrapped as
* <code>JMSException</code>.
*
* @return amazonSQSClient
*/
@Override
public SqsAsyncClient getAmazonSQSClient() {
return amazonSQSClient;
}
/**
* Calls <code>deleteMessage</code> and wraps <code>SdkException</code>. This is used to
* acknowledge single messages, so that they can be deleted from SQS queue.
*
* @param deleteMessageRequest Container for the necessary parameters to execute the
* deleteMessage service method on SqsAsyncClient.
* @throws JMSException
*/
@Override
public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws JMSException {
try {
amazonSQSClient.deleteMessage(prepareRequest(deleteMessageRequest));
} catch (SdkException e) {
throw handleException(e, "deleteMessage");
}
}
/**
* Calls <code>deleteMessageBatch</code> and wraps
* <code>SdkException</code>. This is used to acknowledge multiple
* messages on client_acknowledge mode, so that they can be deleted from SQS
* queue.
*
* @param deleteMessageBatchRequest Container for the necessary parameters to execute the
* deleteMessageBatch service method on SqsAsyncClient. This is the
* batch version of deleteMessage. Max batch size is 10.
* @return The response from the deleteMessageBatch service method, as
* returned by SqsAsyncClient
* @throws JMSException
*/
@Override
public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws JMSException {
try {
return amazonSQSClient.deleteMessageBatch(prepareRequest(deleteMessageBatchRequest)).join();
} catch (SdkException e) {
throw handleException(e, "deleteMessageBatch");
}
}
/**
* Calls <code>sendMessage</code> and wraps
* <code>AmazonClientException</code>.
*
* @param sendMessageRequest Container for the necessary parameters to execute the
* sendMessage service method on SqsAsyncClient.
* @return The response from the sendMessage service method, as returned by
* SqsAsyncClient
* @throws JMSException
*/
@Override
public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) throws JMSException {
try {
return amazonSQSClient.sendMessage(prepareRequest(sendMessageRequest)).join();
} catch (SdkException e) {
throw handleException(e, "sendMessage");
}
}
/**
* Check if the requested queue exists. This function calls
* <code>GetQueueUrl</code> for the given queue name, returning true on
* success, false if it gets <code>QueueDoesNotExistException</code>.
*
* @param queueName the queue to check
* @return true if the queue exists, false if it doesn't.
* @throws JMSException
*/
@Override
public boolean queueExists(String queueName) throws JMSException {
try {
GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder().queueName(queueName).build();
amazonSQSClient.getQueueUrl(prepareRequest(getQueueUrlRequest));
return true;
} catch (QueueDoesNotExistException e) {
return false;
} catch (SdkException e) {
throw handleException(e, "getQueueUrl");
}
}
/**
* Check if the requested queue exists. This function calls
* <code>GetQueueUrl</code> for the given queue name with the given owner
* accountId, returning true on success, false if it gets
* <code>QueueDoesNotExistException</code>.
*
* @param queueName the queue to check
* @param queueOwnerAccountId The AWS accountId of the account that created the queue
* @return true if the queue exists, false if it doesn't.
* @throws JMSException
*/
@Override
public boolean queueExists(String queueName, String queueOwnerAccountId) throws JMSException {
try {
GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder()
.queueName(queueName)
.queueOwnerAWSAccountId(queueOwnerAccountId)
.build();
amazonSQSClient.getQueueUrl(prepareRequest(getQueueUrlRequest));
return true;
} catch (QueueDoesNotExistException e) {
return false;
} catch (SdkException e) {
throw handleException(e, "getQueueUrl");
}
}
/**
* Gets the queueUrl of a queue given a queue name.
*
* @param queueName
* @return The response from the GetQueueUrl service method, as returned by
* SqsAsyncClient, which will include queue`s URL
* @throws JMSException
*/
@Override
public GetQueueUrlResponse getQueueUrl(String queueName) throws JMSException {
GetQueueUrlRequest request = GetQueueUrlRequest.builder()
.queueName(queueName)
.build();
return getQueueUrl(request);
}
/**
* Gets the queueUrl of a queue given a queue name owned by the provided accountId.
*
* @param queueName
* @param queueOwnerAccountId The AWS accountId of the account that created the queue
* @return The response from the GetQueueUrl service method, as returned by
* SqsAsyncClient, which will include queue`s URL
* @throws JMSException
*/
@Override
public GetQueueUrlResponse getQueueUrl(String queueName, String queueOwnerAccountId) throws JMSException {
GetQueueUrlRequest request = GetQueueUrlRequest.builder()
.queueName(queueName)
.queueOwnerAWSAccountId(queueOwnerAccountId)
.build();
return getQueueUrl(request);
}
/**
* Calls <code>getQueueUrl</code> and wraps <code>SdkException</code>
*
* @param getQueueUrlRequest Container for the necessary parameters to execute the
* getQueueUrl service method on SqsAsyncClient.
* @return The response from the GetQueueUrl service method, as returned by
* SqsAsyncClient, which will include queue`s URL
* @throws JMSException
*/
@Override
public GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws JMSException {
try {
return amazonSQSClient.getQueueUrl(prepareRequest(getQueueUrlRequest)).join();
} catch (SdkException e) {
throw handleException(e, "getQueueUrl");
}
}
/**
* Calls <code>createQueue</code> to create the queue with the default queue attributes,
* and wraps <code>SdkException</code>
*
* @param queueName
* @return The response from the createQueue service method, as returned by
* SqsAsyncClient. This call creates a new queue, or returns the URL of
* an existing one.
* @throws JMSException
*/
@Override
public CreateQueueResponse createQueue(String queueName) throws JMSException {
return createQueue(CreateQueueRequest.builder().queueName(queueName).build());
}
/**
* Calls <code>createQueue</code> to create the queue with the provided queue attributes
* if any, and wraps <code>SdkException</code>
*
* @param createQueueRequest Container for the necessary parameters to execute the
* createQueue service method on SqsAsyncClient.
* @return The response from the createQueue service method, as returned by
* SqsAsyncClient. This call creates a new queue, or returns the URL of
* an existing one.
* @throws JMSException
*/
@Override
public CreateQueueResponse createQueue(CreateQueueRequest createQueueRequest) throws JMSException {
try {
return amazonSQSClient.createQueue(prepareRequest(createQueueRequest)).join();
} catch (SdkException e) {
throw handleException(e, "createQueue");
}
}
/**
* Calls <code>receiveMessage</code> and wraps <code>SdkException</code>. Used by
* {@link SQSMessageConsumerPrefetch} to receive up to minimum of
* (<code>numberOfMessagesToPrefetch</code>,10) messages from SQS queue into consumer
* prefetch buffers.
*
* @param receiveMessageRequest Container for the necessary parameters to execute the
* receiveMessage service method on SqsAsyncClient.
* @return The response from the ReceiveMessage service method, as returned
* by SqsAsyncClient.
* @throws JMSException
*/
@Override
public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws JMSException {
try {
return amazonSQSClient.receiveMessage(prepareRequest(receiveMessageRequest)).join();
} catch (SdkException e) {
throw handleException(e, "receiveMessage");
}
}
/**
* Calls <code>changeMessageVisibility</code> and wraps <code>SdkException</code>. This is
* used to for negative acknowledge of a single message, so that messages can be received again without any delay.
*
* @param changeMessageVisibilityRequest Container for the necessary parameters to execute the
* changeMessageVisibility service method on SqsAsyncClient.
* @throws JMSException
*/
@Override
public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws JMSException {
try {
amazonSQSClient.changeMessageVisibility(prepareRequest(changeMessageVisibilityRequest));
} catch (SdkException e) {
throw handleException(e, "changeMessageVisibility");
}
}
/**
* Calls <code>changeMessageVisibilityBatch</code> and wraps <code>SdkException</code>. This is
* used to for negative acknowledge of messages in batch, so that messages
* can be received again without any delay.
*
* @param changeMessageVisibilityBatchRequest Container for the necessary parameters to execute the
* changeMessageVisibilityBatch service method on SqsClient.
* @return The response from the changeMessageVisibilityBatch service
* method, as returned by SqsClient.
* @throws JMSException
*/
@Override
public ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch(
ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws JMSException {
try {
return amazonSQSClient.changeMessageVisibilityBatch(prepareRequest(changeMessageVisibilityBatchRequest)).join();
} catch (SdkException e) {
throw handleException(e, "changeMessageVisibilityBatch");
}
}
/**
* Create generic error message for <code>AwsServiceException</code>. Message include
* Action, RequestId, HTTPStatusCode, and AmazonErrorCode.
*/
private String logAndGetAmazonServiceException(AwsServiceException ase, String action) {
String errorMessage = "AmazonServiceException: " + action + ". RequestId: " + ase.requestId() +
"\nHTTPStatusCode: " + ase.statusCode() + " AmazonErrorCode: " + errorCode(ase);
LOG.error(errorMessage, ase);
return errorMessage;
}
/**
* Create generic error message for <code>SdkException</code>. Message include
* Action.
*/
private String logAndGetAmazonClientException(SdkException ace, String action) {
String errorMessage = "AmazonClientException: " + action + ".";
LOG.error(errorMessage, ace);
return errorMessage;
}
private JMSException handleException(SdkException e, String operationName) {
JMSException jmsException;
if (e instanceof AwsServiceException se) {
if (e instanceof QueueDoesNotExistException) {
jmsException = new InvalidDestinationException(
logAndGetAmazonServiceException(se, operationName), errorCode(se));
} else if (isJMSSecurityException(se)) {
jmsException = new JMSSecurityException(
logAndGetAmazonServiceException(se, operationName), errorCode(se));
} else {
jmsException = new JMSException(
logAndGetAmazonServiceException(se, operationName), errorCode(se));
}
} else {
jmsException = new JMSException(logAndGetAmazonClientException(e, operationName));
}
jmsException.initCause(e);
return jmsException;
}
private static String errorCode(AwsServiceException e) {
return e.awsErrorDetails() != null && e.awsErrorDetails().errorCode() != null ? e.awsErrorDetails().errorCode() : "";
}
private static boolean isJMSSecurityException(AwsServiceException e) {
return SECURITY_EXCEPTION_ERROR_CODES.contains(errorCode(e));
}
private <T extends AwsRequest> T prepareRequest(T request) {
return credentialsProvider == null ? request : (T) request.toBuilder().overrideConfiguration(
AwsRequestOverrideConfiguration.builder().credentialsProvider(credentialsProvider).build())
.build();
}
}