-
Notifications
You must be signed in to change notification settings - Fork 7
Add examples to the project #277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0aaf103
40fa704
381d8b9
f64e0f2
80e9d69
c132459
ac1efc5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| { | ||
| "$schema": "./node_modules/@biomejs/biome/configuration_schema.json", | ||
| "extends": [ | ||
| "./node_modules/@lokalise/biome-config/configs/biome-base.jsonc", | ||
| "./node_modules/@lokalise/biome-config/configs/biome-esm.jsonc", | ||
| "./node_modules/@lokalise/biome-config/configs/biome-package.jsonc" | ||
| ] | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| services: | ||
|
|
||
| localstack: | ||
| image: localstack/localstack:4.0.2 | ||
| network_mode: bridge | ||
| hostname: localstack | ||
| ports: | ||
| - '127.0.0.1:4566:4566' # LocalStack Gateway | ||
| - '127.0.0.1:4510-4559:4510-4559' # external services port range | ||
| environment: | ||
| - SERVICES=sns,sqs,s3,sts | ||
| - DEBUG=0 | ||
| - DATA_DIR=${DATA_DIR-} | ||
| - DOCKER_HOST=unix:///var/run/docker.sock | ||
| - LOCALSTACK_HOST=localstack | ||
| # - LOCALSTACK_API_KEY=someDummyKey | ||
| volumes: | ||
| - '${TMPDIR:-/tmp}/localstack:/var/log/localstack' | ||
| - '/var/run/docker.sock:/var/run/docker.sock' | ||
| restart: on-failure |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| import { afterEach, beforeEach, describe, expect, it } from 'vitest' | ||
| import { userConsumer } from './common/Dependencies.ts' | ||
| import { publisherManager } from './common/TestPublisherManager.ts' | ||
| import { UserConsumer } from './common/UserConsumer.js' | ||
|
|
||
| describe('Publish message', () => { | ||
| beforeEach(async () => { | ||
| await publisherManager.initRegisteredPublishers([UserConsumer.SUBSCRIBED_TOPIC_NAME]) | ||
| await userConsumer.start() | ||
| }) | ||
|
|
||
| afterEach(async () => { | ||
| await userConsumer.close() | ||
| }) | ||
|
|
||
| it('Publishes a message', async () => { | ||
| await publisherManager.publish(UserConsumer.SUBSCRIBED_TOPIC_NAME, { | ||
| type: 'user.created', | ||
| payload: { | ||
| id: '123', | ||
| name: 'John Doe', | ||
| }, | ||
| }) | ||
|
|
||
| const receivedMessage = await userConsumer.handlerSpy.waitForMessage({ | ||
| type: 'user.created', | ||
| }) | ||
|
|
||
| expect(receivedMessage.message.payload).toMatchInlineSnapshot(` | ||
| { | ||
| "id": "123", | ||
| "name": "John Doe", | ||
| } | ||
| `) | ||
| }) | ||
| }) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| import { SNSClient, type SNSClientConfig } from '@aws-sdk/client-sns' | ||
| import { SQSClient } from '@aws-sdk/client-sqs' | ||
| import { STSClient } from '@aws-sdk/client-sts' | ||
| import { SnsConsumerErrorResolver } from '@message-queue-toolkit/sns' | ||
| import { pino } from 'pino' | ||
| import { UserConsumer } from './UserConsumer.ts' | ||
|
|
||
| export const TEST_AWS_CONFIG: SNSClientConfig = { | ||
| endpoint: 'http://s3.localhost.localstack.cloud:4566', | ||
| region: 'eu-west-1', | ||
| credentials: { | ||
| accessKeyId: 'access', | ||
| secretAccessKey: 'secret', | ||
| }, | ||
| } | ||
|
|
||
| export const sqsClient = new SQSClient(TEST_AWS_CONFIG) | ||
| export const snsClient = new SNSClient(TEST_AWS_CONFIG) | ||
| export const stsClient = new STSClient(TEST_AWS_CONFIG) | ||
|
|
||
| export const errorReporter = { report: () => {} } | ||
| export const logger = pino() | ||
| export const transactionObservabilityManager = { | ||
| start: () => {}, | ||
| startWithGroup: () => {}, | ||
| stop: () => {}, | ||
| addCustomAttributes: () => {}, | ||
| } | ||
|
|
||
| export const userConsumer = new UserConsumer({ | ||
| errorReporter, | ||
| logger, | ||
| transactionObservabilityManager, | ||
| consumerErrorResolver: new SnsConsumerErrorResolver(), | ||
| sqsClient, | ||
| snsClient, | ||
| stsClient, | ||
| }) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| import { | ||
| type SnsAwareEventDefinition, | ||
| enrichMessageSchemaWithBaseStrict, | ||
| } from '@message-queue-toolkit/schemas' | ||
| import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' | ||
| import { z } from 'zod' | ||
|
|
||
| type AllConsumerMessageSchemas<MessageDefinitionTypes extends CommonEventDefinition[]> = z.infer< | ||
| MessageDefinitionTypes[number]['consumerSchema'] | ||
| > | ||
|
|
||
| export const USER_SCHEMA = z.object({ | ||
| id: z.string(), | ||
| name: z.string(), | ||
| age: z.number().optional(), | ||
| }) | ||
|
|
||
| export const UserEvents = { | ||
| created: { | ||
| ...enrichMessageSchemaWithBaseStrict('user.created', USER_SCHEMA, { | ||
| description: 'User was created', | ||
| }), | ||
| schemaVersion: '1.0.1', | ||
| producedBy: ['user-service'], | ||
| domain: 'users', | ||
| snsTopic: 'user', | ||
| }, | ||
|
|
||
| updated: { | ||
| ...enrichMessageSchemaWithBaseStrict('user.updated', USER_SCHEMA, { | ||
| description: 'User was updated', | ||
| }), | ||
| schemaVersion: '1.0.1', | ||
| producedBy: ['user-service'], | ||
| domain: 'users', | ||
| snsTopic: 'user', | ||
| }, | ||
| } satisfies Record<string, SnsAwareEventDefinition> | ||
|
|
||
| export type UserEventsType = (typeof UserEvents)[keyof typeof UserEvents][] | ||
| export type UserEventConsumerPayloadsType = AllConsumerMessageSchemas<UserEventsType> | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| import { CommonMetadataFiller, EventRegistry } from '@message-queue-toolkit/core' | ||
| import type { allPublisherMessageSchemas } from '@message-queue-toolkit/schemas' | ||
| import { SnsPublisherManager } from '@message-queue-toolkit/sns' | ||
| import type { CommonSnsPublisher } from '@message-queue-toolkit/sns' | ||
| import { CommonSnsPublisherFactory } from '@message-queue-toolkit/sns' | ||
| import { errorReporter, logger, snsClient, stsClient } from './Dependencies.ts' | ||
| import { UserEvents, type UserEventsType } from './TestMessages.ts' | ||
|
|
||
| const isTest = true | ||
|
|
||
| type PublisherTypes = allPublisherMessageSchemas<UserEventsType> | ||
|
|
||
| export const publisherManager = new SnsPublisherManager< | ||
| CommonSnsPublisher<PublisherTypes>, | ||
| UserEventsType | ||
| >( | ||
| { | ||
| errorReporter, | ||
| logger, | ||
| eventRegistry: new EventRegistry(Object.values(UserEvents)), | ||
| snsClient, | ||
| stsClient, | ||
| }, | ||
| { | ||
| metadataFiller: new CommonMetadataFiller({ | ||
| serviceId: 'service', | ||
| }), | ||
| publisherFactory: new CommonSnsPublisherFactory(), | ||
| newPublisherOptions: { | ||
| handlerSpy: true, | ||
| messageIdField: 'id', | ||
| messageTypeField: 'type', | ||
| deletionConfig: { | ||
| deleteIfExists: isTest, // only enable this in tests | ||
| // and ensure that the owning side is doing the deletion. | ||
| // if it is enabled both on consumer and publisher side, you are likely to experience confusing behaviour | ||
| }, | ||
| creationConfig: { | ||
| updateAttributesIfExists: true, | ||
| }, | ||
| }, | ||
| }, | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' | ||
| import type { ConsumerMessageSchema } from '@message-queue-toolkit/schemas' | ||
| import { AbstractSnsSqsConsumer, type SNSSQSConsumerDependencies } from '@message-queue-toolkit/sns' | ||
| import { UserEvents } from './TestMessages.ts' | ||
| import { userCreatedHandler } from './handlers/UserCreatedHandler.ts' | ||
| import { userUpdatedHandler } from './handlers/UserUpdatedHandler.ts' | ||
|
|
||
| type SupportedMessages = ConsumerMessageSchema< | ||
| typeof UserEvents.created | typeof UserEvents.updated | ||
| > | ||
|
|
||
| // biome-ignore lint/complexity/noBannedTypes: to be expanded later | ||
| type ExecutionContext = {} | ||
|
|
||
| export class UserConsumer extends AbstractSnsSqsConsumer<SupportedMessages, ExecutionContext> { | ||
| public static readonly CONSUMED_QUEUE_NAME = 'user-my_service' | ||
| public static readonly SUBSCRIBED_TOPIC_NAME = 'user' | ||
|
|
||
| constructor(dependencies: SNSSQSConsumerDependencies) { | ||
| super( | ||
| dependencies, | ||
| { | ||
| handlerSpy: true, | ||
| handlers: new MessageHandlerConfigBuilder<SupportedMessages, ExecutionContext>() | ||
| .addConfig(UserEvents.created, userCreatedHandler, {}) | ||
| .addConfig(UserEvents.updated, userUpdatedHandler, {}) | ||
| .build(), | ||
| messageTypeField: 'type', | ||
| creationConfig: { | ||
| queue: { | ||
| QueueName: UserConsumer.CONSUMED_QUEUE_NAME, | ||
| }, | ||
| }, | ||
| locatorConfig: { | ||
| topicName: UserConsumer.SUBSCRIBED_TOPIC_NAME, | ||
| }, | ||
| subscriptionConfig: { | ||
| updateAttributesIfExists: false, | ||
| }, | ||
| }, | ||
| {}, | ||
| ) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| import type { Either } from '@lokalise/node-core' | ||
| import type { z } from 'zod' | ||
| import type { USER_SCHEMA, UserEvents } from '../TestMessages.ts' | ||
|
|
||
| let _latestData: z.infer<typeof USER_SCHEMA> | ||
|
|
||
| export function userCreatedHandler( | ||
| message: z.infer<typeof UserEvents.created.consumerSchema>, | ||
| ): Promise<Either<'retryLater', 'success'>> { | ||
| _latestData = message.payload | ||
|
|
||
| return Promise.resolve({ result: 'success' }) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| import type { Either } from '@lokalise/node-core' | ||
| import type { z } from 'zod' | ||
| import type { USER_SCHEMA, UserEvents } from '../TestMessages.ts' | ||
|
|
||
| let _latestData: z.infer<typeof USER_SCHEMA> | ||
|
|
||
| export function userUpdatedHandler( | ||
| message: z.infer<typeof UserEvents.updated.consumerSchema>, | ||
| ): Promise<Either<'retryLater', 'success'>> { | ||
| _latestData = message.payload | ||
|
|
||
| return Promise.resolve({ result: 'success' }) | ||
| } | ||
|
Comment on lines
+7
to
+13
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do not have to be in this PR, but would be nice to show how to use barrier pattern and pre-handlers |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| { | ||
| "name": "@message-queue-toolkit/sns-sqs-examples", | ||
| "version": "1.0.0", | ||
| "description": "", | ||
| "scripts": { | ||
| "build": "tsc", | ||
| "lint": "biome check . && tsc", | ||
| "lint:fix": "biome check --write .", | ||
| "docker:start": "docker compose up -d", | ||
| "docker:stop": "docker compose down", | ||
| "test": "vitest --coverage" | ||
| }, | ||
| "type": "module", | ||
| "repository": { | ||
| "type": "git", | ||
| "url": "https://github.com/kibertoad/message-queue-toolkit.git" | ||
| }, | ||
| "dependencies": { | ||
| "@aws-sdk/client-sns": "^3.812.0", | ||
| "@aws-sdk/client-sts": "^3.812.0", | ||
| "@aws-sdk/client-sqs": "^3.812.0", | ||
| "@message-queue-toolkit/core": "^21.1.1", | ||
| "@message-queue-toolkit/schemas": "^6.1.0", | ||
| "@message-queue-toolkit/sns": "22.0.1", | ||
| "@message-queue-toolkit/sqs": "21.0.1", | ||
| "pino": "^9.7.0", | ||
| "zod": "^3.24.4" | ||
| }, | ||
| "devDependencies": { | ||
| "@biomejs/biome": "1.9.4", | ||
| "@lokalise/biome-config": "^2.0.0", | ||
| "@types/node": "^22.15.18", | ||
| "@lokalise/tsconfig": "^1.3.0", | ||
| "@vitest/coverage-v8": "^3.1.3", | ||
| "typescript": "^5.8.3", | ||
| "vitest": "^3.1.3" | ||
| }, | ||
| "private": true | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| { | ||
| "extends": "@lokalise/tsconfig/tsc", | ||
| "include": ["lib/**/*", "test/**/*", "vitest.config.ts"], | ||
| "compilerOptions": { | ||
| "types": ["vitest/globals"] | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| import { defineConfig } from 'vitest/config' | ||
|
|
||
| // biome-ignore lint/style/noDefaultExport: vite expects default export | ||
| export default defineConfig({ | ||
| test: { | ||
| globals: true, | ||
| watch: false, | ||
| restoreMocks: true, | ||
| pool: 'threads', | ||
| poolOptions: { | ||
| threads: { singleThread: true }, | ||
| }, | ||
| coverage: { | ||
| provider: 'v8', | ||
| include: ['lib/**/*.ts'], | ||
| exclude: ['vitest.config.ts', 'lib/**/index.ts'], | ||
| thresholds: { | ||
| lines: 95, | ||
| functions: 55, | ||
| branches: 100, | ||
| statements: 95, | ||
| }, | ||
| }, | ||
| }, | ||
| }) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| import type { ZodObject, ZodTypeAny } from 'zod' | ||
| import type z from 'zod' | ||
|
|
||
| import type { MetadataObject } from '../messages/baseMessageSchemas.js' | ||
| import type { CONSUMER_BASE_EVENT_SCHEMA, PUBLISHER_BASE_EVENT_SCHEMA } from './baseEventSchemas.ts' | ||
|
|
||
| export type EventTypeNames<EventDefinition extends CommonEventDefinition> = | ||
|
|
@@ -12,7 +13,8 @@ export function isCommonEventDefinition(entity: unknown): entity is CommonEventD | |
|
|
||
| export type CommonEventDefinition = { | ||
| consumerSchema: ZodObject< | ||
| Omit<(typeof CONSUMER_BASE_EVENT_SCHEMA)['shape'], 'payload'> & { | ||
| Omit<(typeof CONSUMER_BASE_EVENT_SCHEMA)['shape'], 'payload' | 'metadata'> & { | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ensuring that metadata here and in baseMessageSchema come from the same place make IDE much happier about infering types for whatever reason |
||
| metadata: MetadataObject | ||
| payload: ZodTypeAny | ||
| } | ||
| > | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| import { expectTypeOf } from 'vitest' | ||
| import { z } from 'zod' | ||
| import type { SnsAwareEventDefinition } from '../vendors/snsSchemas.ts' | ||
| import { enrichMessageSchemaWithBase } from './baseMessageSchemas.ts' | ||
|
|
||
| const myEvents = { | ||
| myEvent: { | ||
| ...enrichMessageSchemaWithBase('user.updated', z.object({})), | ||
| snsTopic: 'user', | ||
| producedBy: ['USER_SERVICE'], | ||
| }, | ||
| } as const satisfies Record<string, SnsAwareEventDefinition> | ||
|
|
||
| describe('base message schemas', () => { | ||
| it('message satisfies SnsAwareEventDefinition', () => { | ||
| expectTypeOf(myEvents.myEvent).toExtend<SnsAwareEventDefinition>() | ||
| }) | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will switch to the one from the library after it is released