Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions examples/sns-sqs/biome.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"$schema": "./node_modules/@biomejs/biome/configuration_schema.json",
"extends": [
"./node_modules/@lokalise/biome-config/configs/biome-base.jsonc",
"./node_modules/@lokalise/biome-config/configs/biome-esm.jsonc",
"./node_modules/@lokalise/biome-config/configs/biome-package.jsonc"
]
}
20 changes: 20 additions & 0 deletions examples/sns-sqs/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
services:

localstack:
image: localstack/localstack:4.0.2
network_mode: bridge
hostname: localstack
ports:
- '127.0.0.1:4566:4566' # LocalStack Gateway
- '127.0.0.1:4510-4559:4510-4559' # external services port range
environment:
- SERVICES=sns,sqs,s3,sts
- DEBUG=0
- DATA_DIR=${DATA_DIR-}
- DOCKER_HOST=unix:///var/run/docker.sock
- LOCALSTACK_HOST=localstack
# - LOCALSTACK_API_KEY=someDummyKey
volumes:
- '${TMPDIR:-/tmp}/localstack:/var/log/localstack'
- '/var/run/docker.sock:/var/run/docker.sock'
restart: on-failure
36 changes: 36 additions & 0 deletions examples/sns-sqs/lib/01-publish-message.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { userConsumer } from './common/Dependencies.ts'
import { publisherManager } from './common/TestPublisherManager.ts'
import { UserConsumer } from './common/UserConsumer.js'

describe('Publish message', () => {
beforeEach(async () => {
await publisherManager.initRegisteredPublishers([UserConsumer.SUBSCRIBED_TOPIC_NAME])
await userConsumer.start()
})

afterEach(async () => {
await userConsumer.close()
})

it('Publishes a message', async () => {
await publisherManager.publish(UserConsumer.SUBSCRIBED_TOPIC_NAME, {
type: 'user.created',
payload: {
id: '123',
name: 'John Doe',
},
})

const receivedMessage = await userConsumer.handlerSpy.waitForMessage({
type: 'user.created',
})

expect(receivedMessage.message.payload).toMatchInlineSnapshot(`
{
"id": "123",
"name": "John Doe",
}
`)
})
})
38 changes: 38 additions & 0 deletions examples/sns-sqs/lib/common/Dependencies.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { SNSClient, type SNSClientConfig } from '@aws-sdk/client-sns'
import { SQSClient } from '@aws-sdk/client-sqs'
import { STSClient } from '@aws-sdk/client-sts'
import { SnsConsumerErrorResolver } from '@message-queue-toolkit/sns'
import { pino } from 'pino'
import { UserConsumer } from './UserConsumer.ts'

export const TEST_AWS_CONFIG: SNSClientConfig = {
endpoint: 'http://s3.localhost.localstack.cloud:4566',
region: 'eu-west-1',
credentials: {
accessKeyId: 'access',
secretAccessKey: 'secret',
},
}

export const sqsClient = new SQSClient(TEST_AWS_CONFIG)
export const snsClient = new SNSClient(TEST_AWS_CONFIG)
export const stsClient = new STSClient(TEST_AWS_CONFIG)

export const errorReporter = { report: () => {} }
export const logger = pino()
export const transactionObservabilityManager = {
start: () => {},
startWithGroup: () => {},
stop: () => {},
addCustomAttributes: () => {},
}

export const userConsumer = new UserConsumer({
errorReporter,
logger,
transactionObservabilityManager,
consumerErrorResolver: new SnsConsumerErrorResolver(),
sqsClient,
snsClient,
stsClient,
})
41 changes: 41 additions & 0 deletions examples/sns-sqs/lib/common/TestMessages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import {
type SnsAwareEventDefinition,
enrichMessageSchemaWithBaseStrict,
} from '@message-queue-toolkit/schemas'
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
import { z } from 'zod'

type AllConsumerMessageSchemas<MessageDefinitionTypes extends CommonEventDefinition[]> = z.infer<

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will switch to the one from the library after it is released

MessageDefinitionTypes[number]['consumerSchema']
>

export const USER_SCHEMA = z.object({
id: z.string(),
name: z.string(),
age: z.number().optional(),
})

export const UserEvents = {
created: {
...enrichMessageSchemaWithBaseStrict('user.created', USER_SCHEMA, {
description: 'User was created',
}),
schemaVersion: '1.0.1',
producedBy: ['user-service'],
domain: 'users',
snsTopic: 'user',
},

updated: {
...enrichMessageSchemaWithBaseStrict('user.updated', USER_SCHEMA, {
description: 'User was updated',
}),
schemaVersion: '1.0.1',
producedBy: ['user-service'],
domain: 'users',
snsTopic: 'user',
},
} satisfies Record<string, SnsAwareEventDefinition>

export type UserEventsType = (typeof UserEvents)[keyof typeof UserEvents][]
export type UserEventConsumerPayloadsType = AllConsumerMessageSchemas<UserEventsType>
43 changes: 43 additions & 0 deletions examples/sns-sqs/lib/common/TestPublisherManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { CommonMetadataFiller, EventRegistry } from '@message-queue-toolkit/core'
import type { allPublisherMessageSchemas } from '@message-queue-toolkit/schemas'
import { SnsPublisherManager } from '@message-queue-toolkit/sns'
import type { CommonSnsPublisher } from '@message-queue-toolkit/sns'
import { CommonSnsPublisherFactory } from '@message-queue-toolkit/sns'
import { errorReporter, logger, snsClient, stsClient } from './Dependencies.ts'
import { UserEvents, type UserEventsType } from './TestMessages.ts'

const isTest = true

type PublisherTypes = allPublisherMessageSchemas<UserEventsType>

export const publisherManager = new SnsPublisherManager<
CommonSnsPublisher<PublisherTypes>,
UserEventsType
>(
{
errorReporter,
logger,
eventRegistry: new EventRegistry(Object.values(UserEvents)),
snsClient,
stsClient,
},
{
metadataFiller: new CommonMetadataFiller({
serviceId: 'service',
}),
publisherFactory: new CommonSnsPublisherFactory(),
newPublisherOptions: {
handlerSpy: true,
messageIdField: 'id',
messageTypeField: 'type',
deletionConfig: {
deleteIfExists: isTest, // only enable this in tests
// and ensure that the owning side is doing the deletion.
// if it is enabled both on consumer and publisher side, you are likely to experience confusing behaviour
},
creationConfig: {
updateAttributesIfExists: true,
},
},
},
)
44 changes: 44 additions & 0 deletions examples/sns-sqs/lib/common/UserConsumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'
import type { ConsumerMessageSchema } from '@message-queue-toolkit/schemas'
import { AbstractSnsSqsConsumer, type SNSSQSConsumerDependencies } from '@message-queue-toolkit/sns'
import { UserEvents } from './TestMessages.ts'
import { userCreatedHandler } from './handlers/UserCreatedHandler.ts'
import { userUpdatedHandler } from './handlers/UserUpdatedHandler.ts'

type SupportedMessages = ConsumerMessageSchema<
typeof UserEvents.created | typeof UserEvents.updated
>

// biome-ignore lint/complexity/noBannedTypes: to be expanded later
type ExecutionContext = {}

export class UserConsumer extends AbstractSnsSqsConsumer<SupportedMessages, ExecutionContext> {
public static readonly CONSUMED_QUEUE_NAME = 'user-my_service'
public static readonly SUBSCRIBED_TOPIC_NAME = 'user'

constructor(dependencies: SNSSQSConsumerDependencies) {
super(
dependencies,
{
handlerSpy: true,
handlers: new MessageHandlerConfigBuilder<SupportedMessages, ExecutionContext>()
.addConfig(UserEvents.created, userCreatedHandler, {})
.addConfig(UserEvents.updated, userUpdatedHandler, {})
.build(),
messageTypeField: 'type',
creationConfig: {
queue: {
QueueName: UserConsumer.CONSUMED_QUEUE_NAME,
},
},
locatorConfig: {
topicName: UserConsumer.SUBSCRIBED_TOPIC_NAME,
},
subscriptionConfig: {
updateAttributesIfExists: false,
},
},
{},
)
}
}
13 changes: 13 additions & 0 deletions examples/sns-sqs/lib/common/handlers/UserCreatedHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { Either } from '@lokalise/node-core'
import type { z } from 'zod'
import type { USER_SCHEMA, UserEvents } from '../TestMessages.ts'

let _latestData: z.infer<typeof USER_SCHEMA>

export function userCreatedHandler(
message: z.infer<typeof UserEvents.created.consumerSchema>,
): Promise<Either<'retryLater', 'success'>> {
_latestData = message.payload

return Promise.resolve({ result: 'success' })
}
13 changes: 13 additions & 0 deletions examples/sns-sqs/lib/common/handlers/UserUpdatedHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { Either } from '@lokalise/node-core'
import type { z } from 'zod'
import type { USER_SCHEMA, UserEvents } from '../TestMessages.ts'

let _latestData: z.infer<typeof USER_SCHEMA>

export function userUpdatedHandler(
message: z.infer<typeof UserEvents.updated.consumerSchema>,
): Promise<Either<'retryLater', 'success'>> {
_latestData = message.payload

return Promise.resolve({ result: 'success' })
}
Comment on lines +7 to +13

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not have to be in this PR, but would be nice to show how to use barrier pattern and pre-handlers

39 changes: 39 additions & 0 deletions examples/sns-sqs/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"name": "@message-queue-toolkit/sns-sqs-examples",
"version": "1.0.0",
"description": "",
"scripts": {
"build": "tsc",
"lint": "biome check . && tsc",
"lint:fix": "biome check --write .",
"docker:start": "docker compose up -d",
"docker:stop": "docker compose down",
"test": "vitest --coverage"
},
"type": "module",
"repository": {
"type": "git",
"url": "https://github.com/kibertoad/message-queue-toolkit.git"
},
"dependencies": {
"@aws-sdk/client-sns": "^3.812.0",
"@aws-sdk/client-sts": "^3.812.0",
"@aws-sdk/client-sqs": "^3.812.0",
"@message-queue-toolkit/core": "^21.1.1",
"@message-queue-toolkit/schemas": "^6.1.0",
"@message-queue-toolkit/sns": "22.0.1",
"@message-queue-toolkit/sqs": "21.0.1",
"pino": "^9.7.0",
"zod": "^3.24.4"
},
"devDependencies": {
"@biomejs/biome": "1.9.4",
"@lokalise/biome-config": "^2.0.0",
"@types/node": "^22.15.18",
"@lokalise/tsconfig": "^1.3.0",
"@vitest/coverage-v8": "^3.1.3",
"typescript": "^5.8.3",
"vitest": "^3.1.3"
},
"private": true
}
7 changes: 7 additions & 0 deletions examples/sns-sqs/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"extends": "@lokalise/tsconfig/tsc",
"include": ["lib/**/*", "test/**/*", "vitest.config.ts"],
"compilerOptions": {
"types": ["vitest/globals"]
}
}
25 changes: 25 additions & 0 deletions examples/sns-sqs/vitest.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { defineConfig } from 'vitest/config'

// biome-ignore lint/style/noDefaultExport: vite expects default export
export default defineConfig({
test: {
globals: true,
watch: false,
restoreMocks: true,
pool: 'threads',
poolOptions: {
threads: { singleThread: true },
},
coverage: {
provider: 'v8',
include: ['lib/**/*.ts'],
exclude: ['vitest.config.ts', 'lib/**/index.ts'],
thresholds: {
lines: 95,
functions: 55,
branches: 100,
statements: 95,
},
},
},
})
4 changes: 3 additions & 1 deletion packages/schemas/lib/events/eventTypes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { ZodObject, ZodTypeAny } from 'zod'
import type z from 'zod'

import type { MetadataObject } from '../messages/baseMessageSchemas.js'
import type { CONSUMER_BASE_EVENT_SCHEMA, PUBLISHER_BASE_EVENT_SCHEMA } from './baseEventSchemas.ts'

export type EventTypeNames<EventDefinition extends CommonEventDefinition> =
Expand All @@ -12,7 +13,8 @@ export function isCommonEventDefinition(entity: unknown): entity is CommonEventD

export type CommonEventDefinition = {
consumerSchema: ZodObject<
Omit<(typeof CONSUMER_BASE_EVENT_SCHEMA)['shape'], 'payload'> & {
Omit<(typeof CONSUMER_BASE_EVENT_SCHEMA)['shape'], 'payload' | 'metadata'> & {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensuring that metadata here and in baseMessageSchema come from the same place make IDE much happier about infering types for whatever reason

metadata: MetadataObject
payload: ZodTypeAny
}
>
Expand Down
18 changes: 18 additions & 0 deletions packages/schemas/lib/messages/baseMessageSchemas.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { expectTypeOf } from 'vitest'
import { z } from 'zod'
import type { SnsAwareEventDefinition } from '../vendors/snsSchemas.ts'
import { enrichMessageSchemaWithBase } from './baseMessageSchemas.ts'

const myEvents = {
myEvent: {
...enrichMessageSchemaWithBase('user.updated', z.object({})),
snsTopic: 'user',
producedBy: ['USER_SERVICE'],
},
} as const satisfies Record<string, SnsAwareEventDefinition>

describe('base message schemas', () => {
it('message satisfies SnsAwareEventDefinition', () => {
expectTypeOf(myEvents.myEvent).toExtend<SnsAwareEventDefinition>()
})
})
Loading
Loading