Skip to content

Commit 6aece30

Browse files
authored
Added queueName to SQS locator config (#255)
* Added queueName to SQS locator config * Adjusted locator config type * Changed versions update to minor * Updated sqs dependency version
1 parent 821ec99 commit 6aece30

10 files changed

Lines changed: 148 additions & 92 deletions

File tree

packages/sns/lib/utils/snsInitter.ts

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ import type { CreateTopicCommandInput, SNSClient } from '@aws-sdk/client-sns'
22
import type { CreateQueueCommandInput, SQSClient } from '@aws-sdk/client-sqs'
33
import type { DeletionConfig, ExtraParams } from '@message-queue-toolkit/core'
44
import { isProduction } from '@message-queue-toolkit/core'
5-
import type { SQSCreationConfig, SQSQueueLocatorType } from '@message-queue-toolkit/sqs'
5+
import {
6+
type SQSCreationConfig,
7+
resolveQueueUrlFromLocatorConfig,
8+
} from '@message-queue-toolkit/sqs'
69
import { deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs'
710

811
import type { SNSCreationConfig, SNSTopicLocatorType } from '../sns/AbstractSnsService'
@@ -54,7 +57,7 @@ export async function initSnsSqs(
5457
}
5558

5659
const topicResolutionOptions: TopicResolutionOptions = {
57-
...locatorConfig,
60+
...(locatorConfig as SNSSQSQueueLocatorType),
5861
...creationConfig.topic,
5962
}
6063

@@ -86,11 +89,7 @@ export async function initSnsSqs(
8689
}
8790
}
8891

89-
if (!locatorConfig.queueUrl) {
90-
throw new Error(
91-
'If locatorConfig.subscriptionArn is provided, you have to also provide locatorConfig.queueUrl',
92-
)
93-
}
92+
const queueUrl = await resolveQueueUrlFromLocatorConfig(sqsClient, locatorConfig)
9493

9594
const checkPromises: Promise<Either<'not_found', unknown>>[] = []
9695
// Check for existing resources, using the locators
@@ -99,26 +98,21 @@ export async function initSnsSqs(
9998
const topicPromise = getTopicAttributes(snsClient, subscriptionTopicArn)
10099
checkPromises.push(topicPromise)
101100

102-
if (locatorConfig.queueUrl) {
103-
const queuePromise = getQueueAttributes(
104-
sqsClient,
105-
(locatorConfig as SQSQueueLocatorType).queueUrl,
106-
)
107-
checkPromises.push(queuePromise)
108-
}
101+
const queuePromise = getQueueAttributes(sqsClient, queueUrl)
102+
checkPromises.push(queuePromise)
109103

110104
const [topicCheckResult, queueCheckResult] = await Promise.all(checkPromises)
111105

112106
if (queueCheckResult?.error === 'not_found') {
113-
throw new Error(`Queue with queueUrl ${locatorConfig.queueUrl} does not exist.`)
107+
throw new Error(`Queue with queueUrl ${queueUrl} does not exist.`)
114108
}
115109
if (topicCheckResult.error === 'not_found') {
116110
throw new Error(`Topic with topicArn ${locatorConfig.topicArn} does not exist.`)
117111
}
118112

119113
let queueName: string
120-
if ((locatorConfig as SQSQueueLocatorType).queueUrl) {
121-
const splitUrl = (locatorConfig as SQSQueueLocatorType).queueUrl.split('/')
114+
if (queueUrl) {
115+
const splitUrl = queueUrl.split('/')
122116
queueName = splitUrl[splitUrl.length - 1]
123117
} else {
124118
queueName = creationConfig!.queue.QueueName!
@@ -127,7 +121,7 @@ export async function initSnsSqs(
127121
return {
128122
subscriptionArn: locatorConfig.subscriptionArn,
129123
topicArn: subscriptionTopicArn,
130-
queueUrl: locatorConfig.queueUrl,
124+
queueUrl,
131125
queueName,
132126
}
133127
}

packages/sns/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/sns",
3-
"version": "21.0.0",
3+
"version": "21.1.0",
44
"private": false,
55
"license": "MIT",
66
"description": "SNS adapter for message-queue-toolkit",
@@ -35,7 +35,7 @@
3535
"@aws-sdk/client-sts": "^3.632.0",
3636
"@message-queue-toolkit/core": ">=20.0.0",
3737
"@message-queue-toolkit/schemas": ">=2.0.0",
38-
"@message-queue-toolkit/sqs": ">=20.0.0"
38+
"@message-queue-toolkit/sqs": ">=20.1.0"
3939
},
4040
"devDependencies": {
4141
"@aws-sdk/client-s3": "^3.670.0",

packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ describe('SnsSqsPermissionConsumer', () => {
2020
const queueName = 'some-queue'
2121
const topicNome = 'some-topic'
2222

23+
const queueUrl = `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`
24+
2325
let diContainer: AwilixContainer<Dependencies>
2426
let sqsClient: SQSClient
2527
let snsClient: SNSClient
@@ -44,7 +46,7 @@ describe('SnsSqsPermissionConsumer', () => {
4446

4547
const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, {
4648
locatorConfig: {
47-
queueUrl: `http://s3.localhost.localstack.cloud:4566/000000000000/${queueName}`,
49+
queueUrl,
4850
subscriptionArn: 'dummy',
4951
topicArn: 'dummy',
5052
},
@@ -53,7 +55,7 @@ describe('SnsSqsPermissionConsumer', () => {
5355
await expect(() => newConsumer.init()).rejects.toThrow(/does not exist/)
5456
})
5557

56-
it('does not create a new queue when queue locator is passed', async () => {
58+
it('does not create a new queue when queue locator with url is passed', async () => {
5759
await assertQueue(sqsClient, {
5860
QueueName: queueName,
5961
})
@@ -65,16 +67,41 @@ describe('SnsSqsPermissionConsumer', () => {
6567
const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, {
6668
locatorConfig: {
6769
topicArn: arn,
68-
queueUrl: `http://s3.localhost.localstack.cloud:4566/000000000000/${queueName}`,
70+
queueUrl,
6971
subscriptionArn:
7072
'arn:aws:sns:eu-west-1:000000000000:user_permissions:bdf640a2-bedf-475a-98b8-758b88c87395',
7173
},
7274
})
7375

7476
await newConsumer.init()
75-
expect(newConsumer.subscriptionProps.queueUrl).toBe(
76-
`http://s3.localhost.localstack.cloud:4566/000000000000/${queueName}`,
77+
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
78+
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)
79+
expect(newConsumer.subscriptionProps.topicArn).toEqual(arn)
80+
expect(newConsumer.subscriptionProps.subscriptionArn).toBe(
81+
'arn:aws:sns:eu-west-1:000000000000:user_permissions:bdf640a2-bedf-475a-98b8-758b88c87395',
7782
)
83+
})
84+
85+
it('does not create a new queue when queue locator with name is passed', async () => {
86+
await assertQueue(sqsClient, {
87+
QueueName: queueName,
88+
})
89+
90+
const arn = await assertTopic(snsClient, stsClient, {
91+
Name: topicNome,
92+
})
93+
94+
const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, {
95+
locatorConfig: {
96+
topicArn: arn,
97+
queueName,
98+
subscriptionArn:
99+
'arn:aws:sns:eu-west-1:000000000000:user_permissions:bdf640a2-bedf-475a-98b8-758b88c87395',
100+
},
101+
})
102+
103+
await newConsumer.init()
104+
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
78105
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)
79106
expect(newConsumer.subscriptionProps.topicArn).toEqual(arn)
80107
expect(newConsumer.subscriptionProps.subscriptionArn).toBe(
@@ -99,9 +126,7 @@ describe('SnsSqsPermissionConsumer', () => {
99126
})
100127

101128
await newConsumer.init()
102-
expect(newConsumer.subscriptionProps.queueUrl).toBe(
103-
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
104-
)
129+
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
105130
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)
106131
expect(newConsumer.subscriptionProps.topicArn).toEqual(arn)
107132
expect(newConsumer.subscriptionProps.subscriptionArn).toMatch(
@@ -151,9 +176,7 @@ describe('SnsSqsPermissionConsumer', () => {
151176
})
152177

153178
await newConsumer.init()
154-
expect(newConsumer.subscriptionProps.queueUrl).toBe(
155-
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
156-
)
179+
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
157180
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)
158181

159182
const updateCall = sqsSpy.mock.calls.find((entry) => {
@@ -197,9 +220,7 @@ describe('SnsSqsPermissionConsumer', () => {
197220
})
198221

199222
await newConsumer.init()
200-
expect(newConsumer.subscriptionProps.queueUrl).toBe(
201-
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
202-
)
223+
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
203224
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)
204225

205226
const updateCall = sqsSpy.mock.calls.find((entry) => {
@@ -382,9 +403,7 @@ describe('SnsSqsPermissionConsumer', () => {
382403
})
383404

384405
await newConsumer.init()
385-
expect(newConsumer.subscriptionProps.queueUrl).toBe(
386-
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
387-
)
406+
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
388407
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)
389408

390409
const attributes = await getQueueAttributes(
@@ -426,9 +445,7 @@ describe('SnsSqsPermissionConsumer', () => {
426445
})
427446

428447
await newConsumer.init()
429-
expect(newConsumer.subscriptionProps.queueUrl).toBe(
430-
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
431-
)
448+
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
432449

433450
const attributes = await getQueueAttributes(
434451
sqsClient,
@@ -461,9 +478,7 @@ describe('SnsSqsPermissionConsumer', () => {
461478
})
462479

463480
await newConsumer.init()
464-
expect(newConsumer.subscriptionProps.queueUrl).toBe(
465-
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
466-
)
481+
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
467482
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)
468483

469484
const attributes = await getQueueAttributes(
@@ -492,9 +507,7 @@ describe('SnsSqsPermissionConsumer', () => {
492507
})
493508

494509
await newConsumer.init()
495-
expect(newConsumer.subscriptionProps.queueUrl).toBe(
496-
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
497-
)
510+
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
498511
expect(newConsumer.subscriptionProps.deadLetterQueueUrl).toBe(
499512
'http://sqs.eu-west-1.localstack:4566/000000000000/deadLetterQueue',
500513
)
@@ -532,9 +545,7 @@ describe('SnsSqsPermissionConsumer', () => {
532545
})
533546

534547
await newConsumer.init()
535-
expect(newConsumer.subscriptionProps.queueUrl).toBe(
536-
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
537-
)
548+
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
538549
expect(newConsumer.subscriptionProps.deadLetterQueueUrl).toBe(
539550
'http://sqs.eu-west-1.localstack:4566/000000000000/deadLetterQueue',
540551
)

packages/sqs/index.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,13 @@ export {
1818
} from './lib/sqs/AbstractSqsPublisher'
1919
export type { SQSMessageOptions } from './lib/sqs/AbstractSqsPublisher'
2020

21-
export { assertQueue, deleteQueue, getQueueAttributes, getQueueUrl } from './lib/utils/sqsUtils'
21+
export {
22+
assertQueue,
23+
deleteQueue,
24+
getQueueAttributes,
25+
getQueueUrl,
26+
resolveQueueUrlFromLocatorConfig,
27+
} from './lib/utils/sqsUtils'
2228
export { deleteSqs, updateQueueAttributes } from './lib/utils/sqsInitter'
2329
export { deserializeSQSMessage } from './lib/utils/sqsMessageDeserializer'
2430
export {

packages/sqs/lib/sqs/AbstractSqsService.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,15 @@ export type SQSCreationConfig = {
2323
forceTagUpdate?: boolean
2424
} & ExtraSQSCreationParams
2525

26-
export type SQSQueueLocatorType = {
27-
queueUrl: string
28-
}
26+
export type SQSQueueLocatorType =
27+
| {
28+
queueUrl: string
29+
queueName?: never
30+
}
31+
| {
32+
queueName: string
33+
queueUrl?: never
34+
}
2935

3036
export abstract class AbstractSqsService<
3137
MessagePayloadType extends object,

packages/sqs/lib/utils/sqsInitter.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ import { isProduction } from '@message-queue-toolkit/core'
55

66
import type { SQSCreationConfig, SQSQueueLocatorType } from '../sqs/AbstractSqsService'
77

8-
import { assertQueue, deleteQueue, getQueueAttributes } from './sqsUtils'
8+
import {
9+
assertQueue,
10+
deleteQueue,
11+
getQueueAttributes,
12+
resolveQueueUrlFromLocatorConfig,
13+
} from './sqsUtils'
914

1015
export async function deleteSqs(
1116
sqsClient: SQSClient,
@@ -63,12 +68,11 @@ export async function initSqs(
6368
creationConfig?: SQSCreationConfig,
6469
) {
6570
// reuse existing queue only
66-
if (locatorConfig?.queueUrl) {
67-
const checkResult = await getQueueAttributes(
68-
sqsClient,
69-
(locatorConfig as SQSQueueLocatorType).queueUrl,
70-
['QueueArn'],
71-
)
71+
if (locatorConfig) {
72+
const queueUrl = await resolveQueueUrlFromLocatorConfig(sqsClient, locatorConfig)
73+
74+
const checkResult = await getQueueAttributes(sqsClient, queueUrl, ['QueueArn'])
75+
7276
if (checkResult.error === 'not_found') {
7377
throw new Error(`Queue with queueUrl ${locatorConfig.queueUrl} does not exist.`)
7478
}
@@ -78,8 +82,6 @@ export async function initSqs(
7882
throw new Error('Queue ARN was not set')
7983
}
8084

81-
const queueUrl = locatorConfig.queueUrl
82-
8385
const splitUrl = queueUrl.split('/')
8486
const queueName = splitUrl[splitUrl.length - 1]
8587
return { queueArn, queueUrl, queueName }

packages/sqs/lib/utils/sqsUtils.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import type { Either } from '@lokalise/node-core'
1616
import { globalLogger } from '@lokalise/node-core'
1717
import { isShallowSubset, waitAndRetry } from '@message-queue-toolkit/core'
1818

19-
import type { ExtraSQSCreationParams } from '../sqs/AbstractSqsService'
19+
import type { ExtraSQSCreationParams, SQSQueueLocatorType } from '../sqs/AbstractSqsService'
2020

2121
import { generateQueuePublishForTopicPolicy } from './sqsAttributeUtils'
2222
import { updateQueueAttributes, updateQueueTags } from './sqsInitter'
@@ -87,6 +87,27 @@ export async function getQueueAttributes(
8787
}
8888
}
8989

90+
export async function resolveQueueUrlFromLocatorConfig(
91+
sqsClient: SQSClient,
92+
locatorConfig: Partial<SQSQueueLocatorType>,
93+
) {
94+
if (locatorConfig.queueUrl) {
95+
return locatorConfig.queueUrl
96+
}
97+
98+
if (!locatorConfig.queueName) {
99+
throw new Error('Invalid locatorConfig setup - queueUrl or queueName must be provided')
100+
}
101+
102+
const queueUrlResult = await getQueueUrl(sqsClient, locatorConfig.queueName)
103+
104+
if (queueUrlResult.error === 'not_found') {
105+
throw new Error(`Queue with queueName ${locatorConfig.queueName} does not exist.`)
106+
}
107+
108+
return queueUrlResult.result
109+
}
110+
90111
async function updateExistingQueue(
91112
sqsClient: SQSClient,
92113
queueUrl: string,

packages/sqs/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/sqs",
3-
"version": "20.0.0",
3+
"version": "20.1.0",
44
"private": false,
55
"license": "MIT",
66
"description": "SQS adapter for message-queue-toolkit",

0 commit comments

Comments
 (0)