Skip to content

Commit 03126bc

Browse files
committed
Merge branch 'main' of https://github.com/kibertoad/message-queue-toolkit into feat/fauxsq
2 parents 0c787f9 + da9c9f3 commit 03126bc

5 files changed

Lines changed: 546 additions & 2 deletions

File tree

packages/sns/lib/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export type {
1919
export { AbstractSnsSqsConsumer } from './sns/AbstractSnsSqsConsumer.ts'
2020
export * from './sns/CommonSnsPublisherFactory.ts'
2121
export { FakeConsumer } from './sns/fakes/FakeConsumer.ts'
22+
export { TestSnsPublisher, type TestSnsPublishOptions } from './sns/fakes/TestSnsPublisher.ts'
2223
export * from './sns/SnsPublisherManager.ts'
2324
export type { CommonMessage } from './types/MessageTypes.ts'
2425
export {
Lines changed: 377 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,377 @@
1+
import type { SNSClient } from '@aws-sdk/client-sns'
2+
import type { SQSClient } from '@aws-sdk/client-sqs'
3+
import type { STSClient } from '@aws-sdk/client-sts'
4+
import { waitAndRetry } from '@lokalise/node-core'
5+
import type { SQSMessage } from '@message-queue-toolkit/sqs'
6+
import { assertQueue, deleteQueue } from '@message-queue-toolkit/sqs'
7+
import type { AwilixContainer } from 'awilix'
8+
import { Consumer } from 'sqs-consumer'
9+
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
10+
import { SnsSqsPermissionConsumer } from '../../../test/consumers/SnsSqsPermissionConsumer.ts'
11+
import { SnsPermissionPublisher } from '../../../test/publishers/SnsPermissionPublisher.ts'
12+
import type { Dependencies } from '../../../test/utils/testContext.ts'
13+
import { registerDependencies } from '../../../test/utils/testContext.ts'
14+
import { subscribeToTopic } from '../../utils/snsSubscriber.ts'
15+
import { assertTopic, deleteTopic } from '../../utils/snsUtils.ts'
16+
import { TestSnsPublisher } from './TestSnsPublisher.ts'
17+
18+
const queueName = 'test-sns-publisher-queue'
19+
const topicName = 'test-sns-publisher-topic'
20+
const fifoTopicName = 'test-sns-publisher-topic.fifo'
21+
const fifoQueueName = 'test-sns-publisher-queue.fifo'
22+
23+
describe('TestSnsPublisher', () => {
24+
let diContainer: AwilixContainer<Dependencies>
25+
let sqsClient: SQSClient
26+
let snsClient: SNSClient
27+
let stsClient: STSClient
28+
let publisher: TestSnsPublisher
29+
30+
beforeEach(async () => {
31+
diContainer = await registerDependencies({}, false)
32+
sqsClient = diContainer.cradle.sqsClient
33+
snsClient = diContainer.cradle.snsClient
34+
stsClient = diContainer.cradle.stsClient
35+
publisher = new TestSnsPublisher(snsClient, stsClient)
36+
37+
await deleteQueue(sqsClient, queueName)
38+
await deleteQueue(sqsClient, fifoQueueName)
39+
await deleteTopic(snsClient, stsClient, topicName)
40+
await deleteTopic(snsClient, stsClient, fifoTopicName)
41+
})
42+
43+
afterEach(async () => {
44+
const { awilixManager } = diContainer.cradle
45+
await awilixManager.executeDispose()
46+
await diContainer.dispose()
47+
})
48+
49+
describe('publish with topicArn', () => {
50+
it('publishes arbitrary message without validation', async () => {
51+
const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName })
52+
const { queueUrl } = await assertQueue(sqsClient, { QueueName: queueName })
53+
await subscribeToTopic(
54+
sqsClient,
55+
snsClient,
56+
stsClient,
57+
{ QueueName: queueName },
58+
{ Name: topicName },
59+
{ updateAttributesIfExists: false },
60+
)
61+
62+
await publisher.publish(
63+
{ totally: 'arbitrary', data: { nested: true }, number: 42 },
64+
{ topicArn },
65+
)
66+
67+
const receivedMessages: SQSMessage[] = []
68+
const consumer = Consumer.create({
69+
queueUrl,
70+
sqs: sqsClient,
71+
// biome-ignore lint/suspicious/useAwait: Consumer.create requires async handler
72+
handleMessage: async (message: SQSMessage) => {
73+
receivedMessages.push(message)
74+
return message
75+
},
76+
})
77+
consumer.start()
78+
79+
await waitAndRetry(() => receivedMessages.length > 0)
80+
consumer.stop()
81+
82+
expect(receivedMessages).toHaveLength(1)
83+
const snsEnvelope = JSON.parse(receivedMessages[0]!.Body!)
84+
const body = JSON.parse(snsEnvelope.Message)
85+
expect(body).toEqual({
86+
totally: 'arbitrary',
87+
data: { nested: true },
88+
number: 42,
89+
})
90+
})
91+
92+
it('publishes message without required schema fields', async () => {
93+
const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName })
94+
const { queueUrl } = await assertQueue(sqsClient, { QueueName: queueName })
95+
await subscribeToTopic(
96+
sqsClient,
97+
snsClient,
98+
stsClient,
99+
{ QueueName: queueName },
100+
{ Name: topicName },
101+
{ updateAttributesIfExists: false },
102+
)
103+
104+
await publisher.publish({ incomplete: 'message' }, { topicArn })
105+
106+
const receivedMessages: SQSMessage[] = []
107+
const consumer = Consumer.create({
108+
queueUrl,
109+
sqs: sqsClient,
110+
// biome-ignore lint/suspicious/useAwait: Consumer.create requires async handler
111+
handleMessage: async (message: SQSMessage) => {
112+
receivedMessages.push(message)
113+
return message
114+
},
115+
})
116+
consumer.start()
117+
118+
await waitAndRetry(() => receivedMessages.length > 0)
119+
consumer.stop()
120+
121+
expect(receivedMessages).toHaveLength(1)
122+
const snsEnvelope = JSON.parse(receivedMessages[0]!.Body!)
123+
const body = JSON.parse(snsEnvelope.Message)
124+
expect(body).toEqual({ incomplete: 'message' })
125+
})
126+
})
127+
128+
describe('publish with topicName', () => {
129+
it('publishes message using topic name', async () => {
130+
await assertTopic(snsClient, stsClient, { Name: topicName })
131+
const { queueUrl } = await assertQueue(sqsClient, { QueueName: queueName })
132+
await subscribeToTopic(
133+
sqsClient,
134+
snsClient,
135+
stsClient,
136+
{ QueueName: queueName },
137+
{ Name: topicName },
138+
{ updateAttributesIfExists: false },
139+
)
140+
141+
await publisher.publish({ test: 'topicName' }, { topicName })
142+
143+
const receivedMessages: SQSMessage[] = []
144+
const consumer = Consumer.create({
145+
queueUrl,
146+
sqs: sqsClient,
147+
// biome-ignore lint/suspicious/useAwait: Consumer.create requires async handler
148+
handleMessage: async (message: SQSMessage) => {
149+
receivedMessages.push(message)
150+
return message
151+
},
152+
})
153+
consumer.start()
154+
155+
await waitAndRetry(() => receivedMessages.length > 0)
156+
consumer.stop()
157+
158+
expect(receivedMessages).toHaveLength(1)
159+
const snsEnvelope = JSON.parse(receivedMessages[0]!.Body!)
160+
const body = JSON.parse(snsEnvelope.Message)
161+
expect(body).toEqual({ test: 'topicName' })
162+
})
163+
})
164+
165+
describe('publish with publisher', () => {
166+
it('publishes to topic extracted from publisher', async () => {
167+
const regularPublisher = new SnsPermissionPublisher(diContainer.cradle, {
168+
creationConfig: {
169+
topic: { Name: topicName },
170+
},
171+
})
172+
await regularPublisher.init()
173+
174+
const { queueUrl } = await assertQueue(sqsClient, { QueueName: queueName })
175+
await subscribeToTopic(
176+
sqsClient,
177+
snsClient,
178+
stsClient,
179+
{ QueueName: queueName },
180+
{ Name: topicName },
181+
{ updateAttributesIfExists: false },
182+
)
183+
184+
await publisher.publish({ test: 'publisher' }, { publisher: regularPublisher })
185+
186+
const receivedMessages: SQSMessage[] = []
187+
const consumer = Consumer.create({
188+
queueUrl,
189+
sqs: sqsClient,
190+
// biome-ignore lint/suspicious/useAwait: Consumer.create requires async handler
191+
handleMessage: async (message: SQSMessage) => {
192+
receivedMessages.push(message)
193+
return message
194+
},
195+
})
196+
consumer.start()
197+
198+
await waitAndRetry(() => receivedMessages.length > 0)
199+
consumer.stop()
200+
201+
expect(receivedMessages).toHaveLength(1)
202+
const snsEnvelope = JSON.parse(receivedMessages[0]!.Body!)
203+
const body = JSON.parse(snsEnvelope.Message)
204+
expect(body).toEqual({ test: 'publisher' })
205+
206+
await regularPublisher.close()
207+
})
208+
209+
it('throws error when publisher is not initialized', async () => {
210+
const regularPublisher = new SnsPermissionPublisher(diContainer.cradle, {
211+
creationConfig: {
212+
topic: { Name: topicName },
213+
},
214+
})
215+
216+
await expect(
217+
publisher.publish({ test: 'data' }, { publisher: regularPublisher }),
218+
).rejects.toThrow('Publisher has not been initialized')
219+
})
220+
})
221+
222+
describe('publish with consumer', () => {
223+
it('publishes to topic extracted from consumer', async () => {
224+
const consumer = new SnsSqsPermissionConsumer(diContainer.cradle, {
225+
creationConfig: {
226+
queue: { QueueName: queueName },
227+
topic: { Name: topicName },
228+
},
229+
})
230+
await consumer.init()
231+
232+
// Use a separate queue to verify the message
233+
const verifyQueueName = 'test-sns-publisher-verify-queue'
234+
await deleteQueue(sqsClient, verifyQueueName)
235+
const { queueUrl: verifyQueueUrl } = await assertQueue(sqsClient, {
236+
QueueName: verifyQueueName,
237+
})
238+
await subscribeToTopic(
239+
sqsClient,
240+
snsClient,
241+
stsClient,
242+
{ QueueName: verifyQueueName },
243+
{ Name: topicName },
244+
{ updateAttributesIfExists: false },
245+
)
246+
247+
await publisher.publish({ test: 'consumer' }, { consumer })
248+
249+
const receivedMessages: SQSMessage[] = []
250+
const sqsConsumer = Consumer.create({
251+
queueUrl: verifyQueueUrl,
252+
sqs: sqsClient,
253+
// biome-ignore lint/suspicious/useAwait: Consumer.create requires async handler
254+
handleMessage: async (message: SQSMessage) => {
255+
receivedMessages.push(message)
256+
return message
257+
},
258+
})
259+
sqsConsumer.start()
260+
261+
await waitAndRetry(() => receivedMessages.length > 0)
262+
sqsConsumer.stop()
263+
264+
expect(receivedMessages).toHaveLength(1)
265+
const snsEnvelope = JSON.parse(receivedMessages[0]!.Body!)
266+
const body = JSON.parse(snsEnvelope.Message)
267+
expect(body).toEqual({ test: 'consumer' })
268+
269+
await consumer.close()
270+
await deleteQueue(sqsClient, verifyQueueName)
271+
})
272+
273+
it('throws error when consumer is not initialized', async () => {
274+
const consumer = new SnsSqsPermissionConsumer(diContainer.cradle, {
275+
creationConfig: {
276+
queue: { QueueName: queueName },
277+
topic: { Name: topicName },
278+
},
279+
})
280+
281+
await expect(publisher.publish({ test: 'data' }, { consumer })).rejects.toThrow(
282+
'Consumer has not been initialized',
283+
)
284+
})
285+
})
286+
287+
describe('publish to FIFO topic', () => {
288+
it('publishes to FIFO topic with MessageGroupId and MessageDeduplicationId', async () => {
289+
const topicArn = await assertTopic(snsClient, stsClient, {
290+
Name: fifoTopicName,
291+
Attributes: { FifoTopic: 'true', ContentBasedDeduplication: 'false' },
292+
})
293+
const { queueUrl: fifoQueueUrl } = await assertQueue(sqsClient, {
294+
QueueName: fifoQueueName,
295+
Attributes: { FifoQueue: 'true' },
296+
})
297+
await subscribeToTopic(
298+
sqsClient,
299+
snsClient,
300+
stsClient,
301+
{ QueueName: fifoQueueName, Attributes: { FifoQueue: 'true' } },
302+
{ Name: fifoTopicName, Attributes: { FifoTopic: 'true' } },
303+
{ updateAttributesIfExists: false },
304+
)
305+
306+
await publisher.publish(
307+
{ test: 'fifo-message' },
308+
{
309+
topicArn,
310+
MessageGroupId: 'test-group',
311+
MessageDeduplicationId: 'unique-id-123',
312+
},
313+
)
314+
315+
const receivedMessages: SQSMessage[] = []
316+
const consumer = Consumer.create({
317+
queueUrl: fifoQueueUrl,
318+
sqs: sqsClient,
319+
// biome-ignore lint/suspicious/useAwait: Consumer.create requires async handler
320+
handleMessage: async (message: SQSMessage) => {
321+
receivedMessages.push(message)
322+
return message
323+
},
324+
})
325+
consumer.start()
326+
327+
await waitAndRetry(() => receivedMessages.length > 0)
328+
consumer.stop()
329+
330+
expect(receivedMessages).toHaveLength(1)
331+
const snsEnvelope = JSON.parse(receivedMessages[0]!.Body!)
332+
const body = JSON.parse(snsEnvelope.Message)
333+
expect(body).toEqual({ test: 'fifo-message' })
334+
})
335+
})
336+
337+
describe('integration with consumer', () => {
338+
it('publishes a valid message that consumer can process', async () => {
339+
const consumer = new SnsSqsPermissionConsumer(diContainer.cradle, {
340+
creationConfig: {
341+
queue: { QueueName: queueName },
342+
topic: { Name: topicName },
343+
},
344+
})
345+
await consumer.start()
346+
347+
// Publish a message matching the consumer's expected schema
348+
await publisher.publish(
349+
{
350+
id: 'test-integration-1',
351+
messageType: 'add',
352+
},
353+
{ consumer },
354+
)
355+
356+
const result = await consumer.handlerSpy.waitForMessageWithId(
357+
'test-integration-1',
358+
'consumed',
359+
)
360+
expect(result.message).toMatchObject({
361+
id: 'test-integration-1',
362+
messageType: 'add',
363+
})
364+
365+
await consumer.close()
366+
})
367+
})
368+
369+
describe('error handling', () => {
370+
it('throws error when no topic specified', async () => {
371+
await expect(
372+
// @ts-expect-error - Testing invalid input
373+
publisher.publish({ test: 'data' }, {}),
374+
).rejects.toThrow('Either topicArn, topicName, consumer, or publisher must be provided')
375+
})
376+
})
377+
})

0 commit comments

Comments
 (0)