55} from '@aws-sdk/client-sqs'
66
77import { config } from '~/src/config/index.js'
8+ import { createLogger } from '~/src/helpers/logging/logger.js'
89import { sqsClient } from '~/src/messaging/sqs.js'
910
1011export const receiveMessageTimeout = config . get ( 'receiveMessageTimeout' )
@@ -14,6 +15,8 @@ const deadLetterQueueArn = config.get('sqsEventsDlqArn')
1415const maxNumberOfMessages = config . get ( 'maxNumberOfMessages' )
1516const visibilityTimeout = config . get ( 'visibilityTimeout' )
1617
18+ const logger = createLogger ( )
19+
1720/**
1821 * @type {ReceiveMessageCommandInput }
1922 */
@@ -58,17 +61,41 @@ export function redriveDlqMessages() {
5861}
5962
6063/**
61- * Delete event message
62- * @param {string } receiptHandle
63- * @returns {Promise<DeleteMessageCommandOutput> }
64+ * Delete DLQ message by messageId
65+ * This has to be done as a combined 'read then delete' (while using a visibility timeout of non-zero)
66+ * otherwise the receipt handles become stale and the delete operation doesn't work.
67+ * @param {string } messageId
6468 */
65- export function deleteDlqMessage ( receiptHandle ) {
66- const command = new DeleteMessageCommand ( {
69+ export async function deleteDlqMessage ( messageId ) {
70+ const receiveCommand = new ReceiveMessageCommand ( {
6771 QueueUrl : deadLetterQueueUrl ,
68- ReceiptHandle : receiptHandle
72+ MaxNumberOfMessages : 10 ,
73+ VisibilityTimeout : 2 ,
74+ WaitTimeSeconds : 0
6975 } )
76+ const messageResponse = await sqsClient . send ( receiveCommand )
7077
71- return sqsClient . send ( command )
78+ const messages = messageResponse . Messages
79+ ? messageResponse . Messages . filter ( ( m ) => m . MessageId === messageId )
80+ : undefined
81+ if ( ! messages ?. length ) {
82+ const errorText = `Message with id ${ messageId } not found in notify-listener DLQ`
83+ logger . info ( errorText )
84+ throw new Error ( errorText )
85+ }
86+
87+ logger . info (
88+ `[DLQ] Number of messages found with id ${ messageId } : ${ messages . length } `
89+ )
90+ for ( const message of messages ) {
91+ const deleteCommand = new DeleteMessageCommand ( {
92+ QueueUrl : deadLetterQueueUrl ,
93+ ReceiptHandle : message . ReceiptHandle
94+ } )
95+ logger . info ( `[DLQ] Deleting message with id ${ messageId } ` )
96+ await sqsClient . send ( deleteCommand )
97+ logger . info ( `[DLQ] Deleted message with id ${ messageId } ` )
98+ }
7299}
73100
74101/**
0 commit comments