Skip to content

Commit 7d04cac

Browse files
authored
feat: connection reworked (#8)
* feat: connection reworked * chore: update
1 parent 51221b1 commit 7d04cac

File tree

1 file changed

+19
-6
lines changed

1 file changed

+19
-6
lines changed

packages/queue/src/repository.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,17 @@ export class Repository implements QueueRepository {
1919
return this.connection.ready
2020
}
2121

22-
async connect(connectionString: string) {
23-
this.#initConnection(connectionString)
22+
async connect(connectionString: string): Promise<void> {
23+
try {
24+
await this.#initConnection(connectionString)
2425

25-
await this.#declareExchanges()
26-
await this.#declareQueues()
27-
await this.#declareBindings()
26+
await this.#declareExchanges()
27+
await this.#declareQueues()
28+
await this.#declareBindings()
29+
} catch (error) {
30+
console.error('RabbitMQ error on init connection', error)
31+
throw error
32+
}
2833
}
2934

3035
get publisher(): Publisher {
@@ -97,15 +102,23 @@ export class Repository implements QueueRepository {
97102
}
98103
}
99104

100-
#initConnection(connectionString: string): void {
105+
async #initConnection(connectionString: string): Promise<void> {
101106
const connection = new Connection({
102107
url: connectionString,
103108
})
104109

110+
connection.on('connection', () => {
111+
// eslint-disable-next-line no-console
112+
console.debug('RabbitMQ connection is successfully (re)established')
113+
})
114+
105115
connection.on('error', (err) => {
106116
console.error('RabbitMQ connection error', err)
107117
})
108118

119+
// Wait for connection to be ready
120+
await connection.onConnect(120_000)
121+
109122
this.#connection = connection
110123
}
111124

0 commit comments

Comments
 (0)