Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions packages/sns/lib/utils/snsInitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ import type { CreateTopicCommandInput, SNSClient } from '@aws-sdk/client-sns'
import type { CreateQueueCommandInput, SQSClient } from '@aws-sdk/client-sqs'
import type { DeletionConfig, ExtraParams } from '@message-queue-toolkit/core'
import { isProduction } from '@message-queue-toolkit/core'
import type { SQSCreationConfig, SQSQueueLocatorType } from '@message-queue-toolkit/sqs'
import {
type SQSCreationConfig,
resolveQueueUrlFromLocatorConfig,
} from '@message-queue-toolkit/sqs'
import { deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs'

import type { SNSCreationConfig, SNSTopicLocatorType } from '../sns/AbstractSnsService'
Expand Down Expand Up @@ -54,7 +57,7 @@ export async function initSnsSqs(
}

const topicResolutionOptions: TopicResolutionOptions = {
...locatorConfig,
...(locatorConfig as SNSSQSQueueLocatorType),
...creationConfig.topic,
}

Expand Down Expand Up @@ -86,11 +89,7 @@ export async function initSnsSqs(
}
}

if (!locatorConfig.queueUrl) {
throw new Error(
'If locatorConfig.subscriptionArn is provided, you have to also provide locatorConfig.queueUrl',
)
}
const queueUrl = await resolveQueueUrlFromLocatorConfig(sqsClient, locatorConfig)

const checkPromises: Promise<Either<'not_found', unknown>>[] = []
// Check for existing resources, using the locators
Expand All @@ -99,26 +98,21 @@ export async function initSnsSqs(
const topicPromise = getTopicAttributes(snsClient, subscriptionTopicArn)
checkPromises.push(topicPromise)

if (locatorConfig.queueUrl) {
const queuePromise = getQueueAttributes(
sqsClient,
(locatorConfig as SQSQueueLocatorType).queueUrl,
)
checkPromises.push(queuePromise)
}
const queuePromise = getQueueAttributes(sqsClient, queueUrl)
checkPromises.push(queuePromise)

const [topicCheckResult, queueCheckResult] = await Promise.all(checkPromises)

if (queueCheckResult?.error === 'not_found') {
throw new Error(`Queue with queueUrl ${locatorConfig.queueUrl} does not exist.`)
throw new Error(`Queue with queueUrl ${queueUrl} does not exist.`)
}
if (topicCheckResult.error === 'not_found') {
throw new Error(`Topic with topicArn ${locatorConfig.topicArn} does not exist.`)
}

let queueName: string
if ((locatorConfig as SQSQueueLocatorType).queueUrl) {
const splitUrl = (locatorConfig as SQSQueueLocatorType).queueUrl.split('/')
if (queueUrl) {
const splitUrl = queueUrl.split('/')
queueName = splitUrl[splitUrl.length - 1]
} else {
queueName = creationConfig!.queue.QueueName!
Expand All @@ -127,7 +121,7 @@ export async function initSnsSqs(
return {
subscriptionArn: locatorConfig.subscriptionArn,
topicArn: subscriptionTopicArn,
queueUrl: locatorConfig.queueUrl,
queueUrl,
queueName,
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/sns/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sns",
"version": "21.0.0",
"version": "21.1.0",
"private": false,
"license": "MIT",
"description": "SNS adapter for message-queue-toolkit",
Expand Down Expand Up @@ -35,7 +35,7 @@
"@aws-sdk/client-sts": "^3.632.0",
"@message-queue-toolkit/core": ">=20.0.0",
"@message-queue-toolkit/schemas": ">=2.0.0",
"@message-queue-toolkit/sqs": ">=20.0.0"
"@message-queue-toolkit/sqs": ">=20.1.0"
},
"devDependencies": {
"@aws-sdk/client-s3": "^3.670.0",
Expand Down
69 changes: 40 additions & 29 deletions packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ describe('SnsSqsPermissionConsumer', () => {
const queueName = 'some-queue'
const topicNome = 'some-topic'

const queueUrl = `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`

let diContainer: AwilixContainer<Dependencies>
let sqsClient: SQSClient
let snsClient: SNSClient
Expand All @@ -44,7 +46,7 @@ describe('SnsSqsPermissionConsumer', () => {

const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, {
locatorConfig: {
queueUrl: `http://s3.localhost.localstack.cloud:4566/000000000000/${queueName}`,
queueUrl,
subscriptionArn: 'dummy',
topicArn: 'dummy',
},
Expand All @@ -53,7 +55,7 @@ describe('SnsSqsPermissionConsumer', () => {
await expect(() => newConsumer.init()).rejects.toThrow(/does not exist/)
})

it('does not create a new queue when queue locator is passed', async () => {
it('does not create a new queue when queue locator with url is passed', async () => {
await assertQueue(sqsClient, {
QueueName: queueName,
})
Expand All @@ -65,16 +67,41 @@ describe('SnsSqsPermissionConsumer', () => {
const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, {
locatorConfig: {
topicArn: arn,
queueUrl: `http://s3.localhost.localstack.cloud:4566/000000000000/${queueName}`,
queueUrl,
subscriptionArn:
'arn:aws:sns:eu-west-1:000000000000:user_permissions:bdf640a2-bedf-475a-98b8-758b88c87395',
},
})

await newConsumer.init()
expect(newConsumer.subscriptionProps.queueUrl).toBe(
`http://s3.localhost.localstack.cloud:4566/000000000000/${queueName}`,
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)
expect(newConsumer.subscriptionProps.topicArn).toEqual(arn)
expect(newConsumer.subscriptionProps.subscriptionArn).toBe(
'arn:aws:sns:eu-west-1:000000000000:user_permissions:bdf640a2-bedf-475a-98b8-758b88c87395',
)
})

it('does not create a new queue when queue locator with name is passed', async () => {
await assertQueue(sqsClient, {
QueueName: queueName,
})

const arn = await assertTopic(snsClient, stsClient, {
Name: topicNome,
})

const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, {
locatorConfig: {
topicArn: arn,
queueName,
subscriptionArn:
'arn:aws:sns:eu-west-1:000000000000:user_permissions:bdf640a2-bedf-475a-98b8-758b88c87395',
},
})

await newConsumer.init()
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)
expect(newConsumer.subscriptionProps.topicArn).toEqual(arn)
expect(newConsumer.subscriptionProps.subscriptionArn).toBe(
Expand All @@ -99,9 +126,7 @@ describe('SnsSqsPermissionConsumer', () => {
})

await newConsumer.init()
expect(newConsumer.subscriptionProps.queueUrl).toBe(
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
)
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)
expect(newConsumer.subscriptionProps.topicArn).toEqual(arn)
expect(newConsumer.subscriptionProps.subscriptionArn).toMatch(
Expand Down Expand Up @@ -151,9 +176,7 @@ describe('SnsSqsPermissionConsumer', () => {
})

await newConsumer.init()
expect(newConsumer.subscriptionProps.queueUrl).toBe(
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
)
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)

const updateCall = sqsSpy.mock.calls.find((entry) => {
Expand Down Expand Up @@ -197,9 +220,7 @@ describe('SnsSqsPermissionConsumer', () => {
})

await newConsumer.init()
expect(newConsumer.subscriptionProps.queueUrl).toBe(
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
)
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)

const updateCall = sqsSpy.mock.calls.find((entry) => {
Expand Down Expand Up @@ -382,9 +403,7 @@ describe('SnsSqsPermissionConsumer', () => {
})

await newConsumer.init()
expect(newConsumer.subscriptionProps.queueUrl).toBe(
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
)
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)

const attributes = await getQueueAttributes(
Expand Down Expand Up @@ -426,9 +445,7 @@ describe('SnsSqsPermissionConsumer', () => {
})

await newConsumer.init()
expect(newConsumer.subscriptionProps.queueUrl).toBe(
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
)
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)

const attributes = await getQueueAttributes(
sqsClient,
Expand Down Expand Up @@ -461,9 +478,7 @@ describe('SnsSqsPermissionConsumer', () => {
})

await newConsumer.init()
expect(newConsumer.subscriptionProps.queueUrl).toBe(
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
)
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
expect(newConsumer.subscriptionProps.queueName).toBe(queueName)

const attributes = await getQueueAttributes(
Expand Down Expand Up @@ -492,9 +507,7 @@ describe('SnsSqsPermissionConsumer', () => {
})

await newConsumer.init()
expect(newConsumer.subscriptionProps.queueUrl).toBe(
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
)
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
expect(newConsumer.subscriptionProps.deadLetterQueueUrl).toBe(
'http://sqs.eu-west-1.localstack:4566/000000000000/deadLetterQueue',
)
Expand Down Expand Up @@ -532,9 +545,7 @@ describe('SnsSqsPermissionConsumer', () => {
})

await newConsumer.init()
expect(newConsumer.subscriptionProps.queueUrl).toBe(
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
)
expect(newConsumer.subscriptionProps.queueUrl).toBe(queueUrl)
expect(newConsumer.subscriptionProps.deadLetterQueueUrl).toBe(
'http://sqs.eu-west-1.localstack:4566/000000000000/deadLetterQueue',
)
Expand Down
8 changes: 7 additions & 1 deletion packages/sqs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ export {
} from './lib/sqs/AbstractSqsPublisher'
export type { SQSMessageOptions } from './lib/sqs/AbstractSqsPublisher'

export { assertQueue, deleteQueue, getQueueAttributes, getQueueUrl } from './lib/utils/sqsUtils'
export {
assertQueue,
deleteQueue,
getQueueAttributes,
getQueueUrl,
resolveQueueUrlFromLocatorConfig,
} from './lib/utils/sqsUtils'
export { deleteSqs, updateQueueAttributes } from './lib/utils/sqsInitter'
export { deserializeSQSMessage } from './lib/utils/sqsMessageDeserializer'
export {
Expand Down
12 changes: 9 additions & 3 deletions packages/sqs/lib/sqs/AbstractSqsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ export type SQSCreationConfig = {
forceTagUpdate?: boolean
} & ExtraSQSCreationParams

export type SQSQueueLocatorType = {
queueUrl: string
}
export type SQSQueueLocatorType =
| {
queueUrl: string
queueName?: never
}
| {
queueName: string
queueUrl?: never
}

export abstract class AbstractSqsService<
MessagePayloadType extends object,
Expand Down
20 changes: 11 additions & 9 deletions packages/sqs/lib/utils/sqsInitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import { isProduction } from '@message-queue-toolkit/core'

import type { SQSCreationConfig, SQSQueueLocatorType } from '../sqs/AbstractSqsService'

import { assertQueue, deleteQueue, getQueueAttributes } from './sqsUtils'
import {
assertQueue,
deleteQueue,
getQueueAttributes,
resolveQueueUrlFromLocatorConfig,
} from './sqsUtils'

export async function deleteSqs(
sqsClient: SQSClient,
Expand Down Expand Up @@ -63,12 +68,11 @@ export async function initSqs(
creationConfig?: SQSCreationConfig,
) {
// reuse existing queue only
if (locatorConfig?.queueUrl) {
const checkResult = await getQueueAttributes(
sqsClient,
(locatorConfig as SQSQueueLocatorType).queueUrl,
['QueueArn'],
)
if (locatorConfig) {
const queueUrl = await resolveQueueUrlFromLocatorConfig(sqsClient, locatorConfig)

const checkResult = await getQueueAttributes(sqsClient, queueUrl, ['QueueArn'])

if (checkResult.error === 'not_found') {
throw new Error(`Queue with queueUrl ${locatorConfig.queueUrl} does not exist.`)
}
Expand All @@ -78,8 +82,6 @@ export async function initSqs(
throw new Error('Queue ARN was not set')
}

const queueUrl = locatorConfig.queueUrl

const splitUrl = queueUrl.split('/')
const queueName = splitUrl[splitUrl.length - 1]
return { queueArn, queueUrl, queueName }
Expand Down
23 changes: 22 additions & 1 deletion packages/sqs/lib/utils/sqsUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import type { Either } from '@lokalise/node-core'
import { globalLogger } from '@lokalise/node-core'
import { isShallowSubset, waitAndRetry } from '@message-queue-toolkit/core'

import type { ExtraSQSCreationParams } from '../sqs/AbstractSqsService'
import type { ExtraSQSCreationParams, SQSQueueLocatorType } from '../sqs/AbstractSqsService'

import { generateQueuePublishForTopicPolicy } from './sqsAttributeUtils'
import { updateQueueAttributes, updateQueueTags } from './sqsInitter'
Expand Down Expand Up @@ -87,6 +87,27 @@ export async function getQueueAttributes(
}
}

export async function resolveQueueUrlFromLocatorConfig(
sqsClient: SQSClient,
locatorConfig: Partial<SQSQueueLocatorType>,
) {
if (locatorConfig.queueUrl) {
return locatorConfig.queueUrl
}

if (!locatorConfig.queueName) {
throw new Error('Invalid locatorConfig setup - queueUrl or queueName must be provided')
}

const queueUrlResult = await getQueueUrl(sqsClient, locatorConfig.queueName)

if (queueUrlResult.error === 'not_found') {
throw new Error(`Queue with queueName ${locatorConfig.queueName} does not exist.`)
}

return queueUrlResult.result
}

async function updateExistingQueue(
sqsClient: SQSClient,
queueUrl: string,
Expand Down
2 changes: 1 addition & 1 deletion packages/sqs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sqs",
"version": "20.0.0",
"version": "20.1.0",
"private": false,
"license": "MIT",
"description": "SQS adapter for message-queue-toolkit",
Expand Down
Loading
Loading