Skip to content

Commit 3553b8a

Browse files
authored
Merge pull request #511 from DEFRA/feat/df-1006-rework
feat/df-1006: DLQ rework
2 parents 32dec53 + 74c9be5 commit 3553b8a

5 files changed

Lines changed: 319 additions & 98 deletions

File tree

src/api/types.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,8 @@
6161

6262
/**
6363
* @typedef {{ Params: { referenceNumber: string }}} GetSubmissionByReference
64-
* @typedef {{ Params: { dlq: string }}} DeadLetterQueueRequest
65-
*/
66-
67-
/**
68-
* @typedef {{ Params: { dlq: string, messageId: string }}} DeadLetterQueueAndHandleRequest
64+
* @typedef {{ Params: { dlq: string }, Query: { visibilityTimeout?: number, waitTimeSeconds?: number }}} DeadLetterQueueRequest
65+
* @typedef {{ Params: { dlq: string, messageId: string }, Query: { visibilityTimeout?: number, waitTimeSeconds?: number }}} DeadLetterQueueMessageRequest
6966
*/
7067

7168
/**

src/messaging/event.js

Lines changed: 119 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
DeleteMessageCommand,
33
ReceiveMessageCommand,
4+
SendMessageCommand,
45
StartMessageMoveTaskCommand
56
} from '@aws-sdk/client-sqs'
67

@@ -11,8 +12,12 @@ import { sqsClient } from '~/src/tasks/sqs.js'
1112
export const receiveMessageTimeout = config.get('receiveMessageTimeout')
1213

1314
const maxNumberOfMessages = config.get('maxNumberOfMessages')
14-
const visibilityTimeout = config.get('visibilityTimeout')
15+
const pollingVisibilityTimeout = config.get('visibilityTimeout')
1516

17+
const MAX_RETRIES = 7
18+
const RETRY_WAIT_BETWEEN_TRIES_IN_SECS = 1
19+
const DEFAULT_VISIBILITY_TIMEOUT = 3
20+
const DEFAULT_WAIT_TIME_IN_SECS = 3
1621
/**
1722
* @param {string} dlqName
1823
*/
@@ -34,7 +39,7 @@ export function receiveMessages(queueUrl) {
3439
const input = {
3540
QueueUrl: queueUrl,
3641
MaxNumberOfMessages: maxNumberOfMessages,
37-
VisibilityTimeout: visibilityTimeout
42+
VisibilityTimeout: pollingVisibilityTimeout
3843
}
3944

4045
const command = new ReceiveMessageCommand(input)
@@ -46,17 +51,71 @@ export function receiveMessages(queueUrl) {
4651
* @param {string} dlq - the SQS deadletter queue identifier
4752
* @returns {Promise<ReceiveMessageResult>}
4853
*/
49-
export function receiveDlqMessages(dlq) {
54+
export function receiveDlqMessages(
55+
dlq,
56+
visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT,
57+
waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS
58+
) {
5059
const queueUrl = getDeadLetterQueueUrl(dlq)
5160
const command = new ReceiveMessageCommand({
5261
QueueUrl: queueUrl,
5362
MaxNumberOfMessages: 10,
54-
VisibilityTimeout: 0,
55-
WaitTimeSeconds: 0
63+
VisibilityTimeout: visibilityTimeout,
64+
WaitTimeSeconds: waitTimeSeconds
5665
})
5766
return sqsClient.send(command)
5867
}
5968

69+
/**
70+
* Get a specific message from the dead-letter queue. Handles retries if not found.
71+
* @param {string} dlq - the SQS deadletter queue identifier
72+
* @param {string} messageId
73+
* @param {number} [visibilityTimeout] - Queue visibilityTimeout
74+
* @param {number} [waitTimeSeconds] - Queue waitTimeSeconds
75+
* @returns {Promise< Message | null >}
76+
*/
77+
export async function getDlqMessage(
78+
dlq,
79+
messageId,
80+
visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT,
81+
waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS
82+
) {
83+
let attempts = 1
84+
85+
while (attempts <= MAX_RETRIES) {
86+
const messageResponse = await receiveDlqMessages(
87+
dlq,
88+
visibilityTimeout,
89+
waitTimeSeconds
90+
)
91+
const messages = messageResponse.Messages ?? []
92+
for (const m of messages) {
93+
logger.info(
94+
`[DLQ] Received message with id ${m.MessageId} on attempt ${attempts}`
95+
)
96+
}
97+
98+
const messageFound = messages.find((m) => m.MessageId === messageId)
99+
if (messageFound) {
100+
logger.info(
101+
`[DLQ] Found message with id ${messageId} on attempt ${attempts}`
102+
)
103+
return messageFound
104+
}
105+
106+
logger.info(
107+
`[DLQ] Message ${messageId} not found in batch ${attempts}, retrying...`
108+
)
109+
110+
await new Promise((resolve) =>
111+
setTimeout(resolve, RETRY_WAIT_BETWEEN_TRIES_IN_SECS * 1000)
112+
)
113+
114+
attempts++
115+
}
116+
return null
117+
}
118+
60119
/**
61120
* Redrive all messages from the dead-letter queue to the main queue
62121
* @param {string} dlq - the SQS deadletter queue ARN
@@ -73,44 +132,70 @@ export function redriveDlqMessages(dlq) {
73132
return sqsClient.send(command)
74133
}
75134

135+
/**
136+
* Submit the specified message to the main queue
137+
* @param {string} dlq - the SQS deadletter queue identifier
138+
* @param {string} messageId
139+
* @param {string} messageJson
140+
*/
141+
export async function resubmitDlqMessage(dlq, messageId, messageJson) {
142+
try {
143+
logger.info(
144+
`[DLQ] Submitting new message in place of message id ${messageId}`
145+
)
146+
147+
const command = new SendMessageCommand({
148+
QueueUrl: getDeadLetterQueueUrl(dlq),
149+
MessageBody: messageJson
150+
})
151+
const sendResult = await sqsClient.send(command)
152+
logger.info(
153+
`[DLQ] Submitting new message in place of message id ${messageId}. New message id is ${sendResult.MessageId}`
154+
)
155+
} catch (err) {
156+
logger.error(
157+
err,
158+
`[DLQ] Failed to submit new message to main queue based on old message of id ${messageId} from DLQ ${dlq}`
159+
)
160+
throw err
161+
}
162+
}
163+
76164
/**
77165
* Delete DLQ message by messageId
78166
* This has to be done as a combined 'read then delete' (while using a visibility timeout of non-zero)
79-
* otherwise the receipt handles become stale and the delete operation doesn't work.
80-
* @param {string} dlq - the SQS deadletter queue ARN
167+
* otherwise the receipt handle becomes stale and the delete operation doesn't work.
168+
* getDlqMessage uses retries in case the message is not always visibile when querying the DLQ.
169+
* @param {string} dlq - the SQS deadletter queue identifier
81170
* @param {string} messageId
171+
* @param {number} [visibilityTimeout] - Queue visibilityTimeout
172+
* @param {number} [waitTimeSeconds] - Queue waitTimeSeconds
82173
*/
83-
export async function deleteDlqMessage(dlq, messageId) {
84-
const queueUrl = getDeadLetterQueueUrl(dlq)
85-
const receiveCommand = new ReceiveMessageCommand({
86-
QueueUrl: queueUrl,
87-
MaxNumberOfMessages: 10,
88-
VisibilityTimeout: 2,
89-
WaitTimeSeconds: 0
90-
})
91-
const messageResponse = await sqsClient.send(receiveCommand)
92-
93-
const messages = messageResponse.Messages
94-
? messageResponse.Messages.filter((m) => m.MessageId === messageId)
95-
: undefined
96-
if (!messages?.length) {
97-
const errorText = `Message with id ${messageId} not found in submissions-api (${dlq}) DLQ`
174+
export async function deleteDlqMessage(
175+
dlq,
176+
messageId,
177+
visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT,
178+
waitTimeSeconds = DEFAULT_WAIT_TIME_IN_SECS
179+
) {
180+
const foundMessage = await getDlqMessage(
181+
dlq,
182+
messageId,
183+
visibilityTimeout,
184+
waitTimeSeconds
185+
)
186+
if (!foundMessage) {
187+
const errorText = `Message with id ${messageId} not found in ${dlq} DLQ after ${MAX_RETRIES} attempts`
98188
logger.info(errorText)
99189
throw new Error(errorText)
100190
}
101191

102-
logger.info(
103-
`[DLQ] Number of messages found with id ${messageId}: ${messages.length}`
104-
)
105-
for (const message of messages) {
106-
const deleteCommand = new DeleteMessageCommand({
107-
QueueUrl: queueUrl,
108-
ReceiptHandle: message.ReceiptHandle
109-
})
110-
logger.info(`[DLQ] Deleting message with id ${messageId}`)
111-
await sqsClient.send(deleteCommand)
112-
logger.info(`[DLQ] Deleted message with id ${messageId}`)
113-
}
192+
const deleteCommand = new DeleteMessageCommand({
193+
QueueUrl: getDeadLetterQueueUrl(dlq),
194+
ReceiptHandle: foundMessage.ReceiptHandle
195+
})
196+
logger.info(`[DLQ] Deleting message with id ${messageId}`)
197+
await sqsClient.send(deleteCommand)
198+
logger.info(`[DLQ] Deleted message with id ${messageId}`)
114199
}
115200

116201
/**

0 commit comments

Comments
 (0)