Skip to content

Commit 27bfb3b

Browse files
authored
Make it possible to disable lazy init for AMQP (#257)
1 parent 8d92634 commit 27bfb3b

4 files changed

Lines changed: 56 additions & 2 deletions

File tree

packages/amqp/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,7 @@ publisher.publish(message)
6666
// This will succeed and consumer, which also received new connection, will be able to consume it
6767
publisher.publish(message)
6868
```
69+
70+
## Lazy instantiation
71+
72+
By default `message-queue-toolkit/amqp` will lazily instantiate publishers before the publish in case they weren't instantiated before. This may result in system not failing fast when connection to AMQP cannot be established, as instantiation happens asynchronously. In case that is an undesirable behaviour, you can set parameter `isLazyInitEnabled` to false either directly for a publisher, or as one of the `newPublisherOptions` parameters for the `AmqpQueuePublisherManager`.

packages/amqp/lib/AbstractAmqpPublisher.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export type AMQPPublisherOptions<
2222
LocatorConfig extends object,
2323
> = QueuePublisherOptions<CreationConfig, LocatorConfig, MessagePayloadType> & {
2424
exchange?: string
25+
isLazyInitEnabled?: boolean // default is true
2526
}
2627

2728
export abstract class AbstractAmqpPublisher<
@@ -40,9 +41,10 @@ export abstract class AbstractAmqpPublisher<
4041
>
4142
implements SyncPublisher<MessagePayloadType, MessageOptionsType>
4243
{
43-
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
4444
protected readonly exchange?: string
4545

46+
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
47+
private readonly isLazyInitEnabled: boolean
4648
private initPromise?: Promise<void>
4749

4850
constructor(
@@ -53,6 +55,7 @@ export abstract class AbstractAmqpPublisher<
5355

5456
this.messageSchemaContainer = this.resolvePublisherMessageSchemaContainer(options)
5557
this.exchange = options.exchange
58+
this.isLazyInitEnabled = options.isLazyInitEnabled ?? true
5659
}
5760

5861
publish(message: MessagePayloadType, options: MessageOptionsType): void {
@@ -64,7 +67,7 @@ export abstract class AbstractAmqpPublisher<
6467
const messageProcessingStartTimestamp = Date.now()
6568

6669
// If it's not initted yet, do the lazy init
67-
if (!this.isInitted) {
70+
if (!this.isInitted && this.isLazyInitEnabled) {
6871
// avoid multiple concurrent inits
6972
if (!this.initPromise) {
7073
this.initPromise = this.init()

packages/amqp/lib/AmqpQueuePublisherManager.spec.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,23 @@ describe('AmqpQueuePublisherManager', () => {
5757
}
5858
`)
5959
})
60+
61+
it('skips lazy init if not enabled', async () => {
62+
const { queuePublisherManagerNoLazy } = diContainer.cradle
63+
const fakeConsumer = new FakeQueueConsumer(diContainer.cradle, TestEvents.updated)
64+
await fakeConsumer.start()
65+
66+
expect(() =>
67+
queuePublisherManagerNoLazy.publishSync(FakeQueueConsumer.QUEUE_NAME, {
68+
type: 'entity.updated',
69+
payload: {
70+
updatedData: 'msg',
71+
},
72+
metadata: {
73+
correlationId: 'some-id',
74+
},
75+
}),
76+
).toThrow(/Error while publishing to AMQP Cannot read properties of undefined/)
77+
})
6078
})
6179
})

packages/amqp/test/utils/testContext.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ export async function registerDependencies(
141141
eventRegistry: asFunction(() => {
142142
return new EventRegistry(Object.values(TestEvents))
143143
}, SINGLETON_CONFIG),
144+
144145
queuePublisherManager: asFunction(
145146
(dependencies) => {
146147
return new AmqpQueuePublisherManager(dependencies, {
@@ -149,6 +150,7 @@ export async function registerDependencies(
149150
}),
150151
publisherFactory: new CommonAmqpQueuePublisherFactory(),
151152
newPublisherOptions: {
153+
isLazyInitEnabled: true,
152154
handlerSpy: true,
153155
messageIdField: 'id',
154156
messageTypeField: 'type',
@@ -160,6 +162,28 @@ export async function registerDependencies(
160162
enabled: queuesEnabled,
161163
},
162164
),
165+
166+
queuePublisherManagerNoLazy: asFunction(
167+
(dependencies) => {
168+
return new AmqpQueuePublisherManager(dependencies, {
169+
metadataFiller: new CommonMetadataFiller({
170+
serviceId: 'service',
171+
}),
172+
publisherFactory: new CommonAmqpQueuePublisherFactory(),
173+
newPublisherOptions: {
174+
isLazyInitEnabled: false,
175+
handlerSpy: true,
176+
messageIdField: 'id',
177+
messageTypeField: 'type',
178+
},
179+
})
180+
},
181+
{
182+
lifetime: Lifetime.SINGLETON,
183+
enabled: queuesEnabled,
184+
},
185+
),
186+
163187
topicPublisherManager: asFunction(
164188
(dependencies) => {
165189
return new AmqpTopicPublisherManager(dependencies, {
@@ -225,6 +249,11 @@ export interface Dependencies {
225249
TestEventsType
226250
>
227251

252+
queuePublisherManagerNoLazy: AmqpQueuePublisherManager<
253+
CommonAmqpQueuePublisher<TestEventPublishPayloadsType>,
254+
TestEventsType
255+
>
256+
228257
topicPublisherManager: AmqpTopicPublisherManager<
229258
CommonAmqpTopicPublisher<TestEventPublishPayloadsType>,
230259
TestEventsType

0 commit comments

Comments
 (0)