Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ publisher.publish(message)
// This will succeed and consumer, which also received new connection, will be able to consume it
publisher.publish(message)
```

## Lazy instantiation

By default `message-queue-toolkit/amqp` will lazily instantiate publishers before the publish in case they weren't instantiated before. This may result in system not failing fast when connection to AMQP cannot be established, as instantiation happens asynchronously. In case that is an undesirable behaviour, you can set parameter `isLazyInitEnabled` to false either directly for a publisher, or as one of the `newPublisherOptions` parameters for the `AmqpQueuePublisherManager`.
7 changes: 5 additions & 2 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export type AMQPPublisherOptions<
LocatorConfig extends object,
> = QueuePublisherOptions<CreationConfig, LocatorConfig, MessagePayloadType> & {
exchange?: string
isLazyInitEnabled?: boolean // default is true
}

export abstract class AbstractAmqpPublisher<
Expand All @@ -40,9 +41,10 @@ export abstract class AbstractAmqpPublisher<
>
implements SyncPublisher<MessagePayloadType, MessageOptionsType>
{
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
protected readonly exchange?: string

private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
private readonly isLazyInitEnabled: boolean
private initPromise?: Promise<void>

constructor(
Expand All @@ -53,6 +55,7 @@ export abstract class AbstractAmqpPublisher<

this.messageSchemaContainer = this.resolvePublisherMessageSchemaContainer(options)
this.exchange = options.exchange
this.isLazyInitEnabled = options.isLazyInitEnabled ?? true
}

publish(message: MessagePayloadType, options: MessageOptionsType): void {
Expand All @@ -64,7 +67,7 @@ export abstract class AbstractAmqpPublisher<
const messageProcessingStartTimestamp = Date.now()

// If it's not initted yet, do the lazy init
if (!this.isInitted) {
if (!this.isInitted && this.isLazyInitEnabled) {
// avoid multiple concurrent inits
if (!this.initPromise) {
this.initPromise = this.init()
Expand Down
18 changes: 18 additions & 0 deletions packages/amqp/lib/AmqpQueuePublisherManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,23 @@ describe('AmqpQueuePublisherManager', () => {
}
`)
})

it('skips lazy init if not enabled', async () => {
const { queuePublisherManagerNoLazy } = diContainer.cradle
const fakeConsumer = new FakeQueueConsumer(diContainer.cradle, TestEvents.updated)
await fakeConsumer.start()

expect(() =>
queuePublisherManagerNoLazy.publishSync(FakeQueueConsumer.QUEUE_NAME, {
type: 'entity.updated',
payload: {
updatedData: 'msg',
},
metadata: {
correlationId: 'some-id',
},
}),
).toThrow(/Error while publishing to AMQP Cannot read properties of undefined/)
})
})
})
29 changes: 29 additions & 0 deletions packages/amqp/test/utils/testContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ export async function registerDependencies(
eventRegistry: asFunction(() => {
return new EventRegistry(Object.values(TestEvents))
}, SINGLETON_CONFIG),

queuePublisherManager: asFunction(
(dependencies) => {
return new AmqpQueuePublisherManager(dependencies, {
Expand All @@ -149,6 +150,7 @@ export async function registerDependencies(
}),
publisherFactory: new CommonAmqpQueuePublisherFactory(),
newPublisherOptions: {
isLazyInitEnabled: true,
handlerSpy: true,
messageIdField: 'id',
messageTypeField: 'type',
Expand All @@ -160,6 +162,28 @@ export async function registerDependencies(
enabled: queuesEnabled,
},
),

queuePublisherManagerNoLazy: asFunction(
(dependencies) => {
return new AmqpQueuePublisherManager(dependencies, {
metadataFiller: new CommonMetadataFiller({
serviceId: 'service',
}),
publisherFactory: new CommonAmqpQueuePublisherFactory(),
newPublisherOptions: {
isLazyInitEnabled: false,
handlerSpy: true,
messageIdField: 'id',
messageTypeField: 'type',
},
})
},
{
lifetime: Lifetime.SINGLETON,
enabled: queuesEnabled,
},
),

topicPublisherManager: asFunction(
(dependencies) => {
return new AmqpTopicPublisherManager(dependencies, {
Expand Down Expand Up @@ -225,6 +249,11 @@ export interface Dependencies {
TestEventsType
>

queuePublisherManagerNoLazy: AmqpQueuePublisherManager<
CommonAmqpQueuePublisher<TestEventPublishPayloadsType>,
TestEventsType
>

topicPublisherManager: AmqpTopicPublisherManager<
CommonAmqpTopicPublisher<TestEventPublishPayloadsType>,
TestEventsType
Expand Down
Loading