diff --git a/packages/amqp/README.md b/packages/amqp/README.md index 16163a45..4c675fcd 100644 --- a/packages/amqp/README.md +++ b/packages/amqp/README.md @@ -66,3 +66,7 @@ publisher.publish(message) // This will succeed and consumer, which also received new connection, will be able to consume it publisher.publish(message) ``` + +## Lazy instantiation + +By default `message-queue-toolkit/amqp` will lazily instantiate publishers before the publish in case they weren't instantiated before. This may result in system not failing fast when connection to AMQP cannot be established, as instantiation happens asynchronously. In case that is an undesirable behaviour, you can set parameter `isLazyInitEnabled` to false either directly for a publisher, or as one of the `newPublisherOptions` parameters for the `AmqpQueuePublisherManager`. diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisher.ts index 0d6f0b32..cb8f6d0c 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisher.ts @@ -22,6 +22,7 @@ export type AMQPPublisherOptions< LocatorConfig extends object, > = QueuePublisherOptions & { exchange?: string + isLazyInitEnabled?: boolean // default is true } export abstract class AbstractAmqpPublisher< @@ -40,9 +41,10 @@ export abstract class AbstractAmqpPublisher< > implements SyncPublisher { - private readonly messageSchemaContainer: MessageSchemaContainer protected readonly exchange?: string + private readonly messageSchemaContainer: MessageSchemaContainer + private readonly isLazyInitEnabled: boolean private initPromise?: Promise constructor( @@ -53,6 +55,7 @@ export abstract class AbstractAmqpPublisher< this.messageSchemaContainer = this.resolvePublisherMessageSchemaContainer(options) this.exchange = options.exchange + this.isLazyInitEnabled = options.isLazyInitEnabled ?? true } publish(message: MessagePayloadType, options: MessageOptionsType): void { @@ -64,7 +67,7 @@ export abstract class AbstractAmqpPublisher< const messageProcessingStartTimestamp = Date.now() // If it's not initted yet, do the lazy init - if (!this.isInitted) { + if (!this.isInitted && this.isLazyInitEnabled) { // avoid multiple concurrent inits if (!this.initPromise) { this.initPromise = this.init() diff --git a/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts b/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts index f4092bea..c01a6fac 100644 --- a/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts +++ b/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts @@ -57,5 +57,23 @@ describe('AmqpQueuePublisherManager', () => { } `) }) + + it('skips lazy init if not enabled', async () => { + const { queuePublisherManagerNoLazy } = diContainer.cradle + const fakeConsumer = new FakeQueueConsumer(diContainer.cradle, TestEvents.updated) + await fakeConsumer.start() + + expect(() => + queuePublisherManagerNoLazy.publishSync(FakeQueueConsumer.QUEUE_NAME, { + type: 'entity.updated', + payload: { + updatedData: 'msg', + }, + metadata: { + correlationId: 'some-id', + }, + }), + ).toThrow(/Error while publishing to AMQP Cannot read properties of undefined/) + }) }) }) diff --git a/packages/amqp/test/utils/testContext.ts b/packages/amqp/test/utils/testContext.ts index 6720a7b4..1f16d5a9 100644 --- a/packages/amqp/test/utils/testContext.ts +++ b/packages/amqp/test/utils/testContext.ts @@ -141,6 +141,7 @@ export async function registerDependencies( eventRegistry: asFunction(() => { return new EventRegistry(Object.values(TestEvents)) }, SINGLETON_CONFIG), + queuePublisherManager: asFunction( (dependencies) => { return new AmqpQueuePublisherManager(dependencies, { @@ -149,6 +150,7 @@ export async function registerDependencies( }), publisherFactory: new CommonAmqpQueuePublisherFactory(), newPublisherOptions: { + isLazyInitEnabled: true, handlerSpy: true, messageIdField: 'id', messageTypeField: 'type', @@ -160,6 +162,28 @@ export async function registerDependencies( enabled: queuesEnabled, }, ), + + queuePublisherManagerNoLazy: asFunction( + (dependencies) => { + return new AmqpQueuePublisherManager(dependencies, { + metadataFiller: new CommonMetadataFiller({ + serviceId: 'service', + }), + publisherFactory: new CommonAmqpQueuePublisherFactory(), + newPublisherOptions: { + isLazyInitEnabled: false, + handlerSpy: true, + messageIdField: 'id', + messageTypeField: 'type', + }, + }) + }, + { + lifetime: Lifetime.SINGLETON, + enabled: queuesEnabled, + }, + ), + topicPublisherManager: asFunction( (dependencies) => { return new AmqpTopicPublisherManager(dependencies, { @@ -225,6 +249,11 @@ export interface Dependencies { TestEventsType > + queuePublisherManagerNoLazy: AmqpQueuePublisherManager< + CommonAmqpQueuePublisher, + TestEventsType + > + topicPublisherManager: AmqpTopicPublisherManager< CommonAmqpTopicPublisher, TestEventsType