Skip to content

Commit a577b6d

Browse files
authored
chore: connect update (#10)
* chore: connect update * chore: update
1 parent 94c0c61 commit a577b6d

File tree

4 files changed

+25
-15
lines changed

4 files changed

+25
-15
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ coverage
2727
.vitepress/cache
2828

2929
packages/**/__screenshots__
30+
WARP.md

examples/microservices/service1/service-start.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { repository } from '../repository'
44
// Connect to RabbitMQ
55
async function init() {
66
try {
7-
await repository.connect('amqp://guest:guest@localhost:5672')
7+
await repository.connect('amqp://guest:guest@localhost:5672', 10)
88
} catch (error) {
99
console.error(error)
1010
}

examples/microservices/service2/service-start.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { repository } from '../repository'
44
// Connect to RabbitMQ
55
async function init() {
66
try {
7-
await repository.connect('amqp://guest:guest@localhost:5672')
7+
await repository.connect('amqp://guest:guest@localhost:5672', 10)
88
} catch (error) {
99
console.error(error)
1010
}

packages/queue/src/repository.ts

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,32 +22,30 @@ export class Repository implements QueueRepository {
2222
/**
2323
* Create a connection to RabbitMQ
2424
* @param connectionString
25-
* @param retryCount
25+
* @param retryCount - Number of retry attempts. Set to Infinity for infinite retries (default: Infinity)
2626
* @returns Promise<void>
2727
*/
28-
async connect(connectionString: string, retryCount = 5): Promise<void> {
29-
for (let attempt = 1; attempt <= retryCount; attempt++) {
28+
async connect(connectionString: string, retryCount = Infinity): Promise<void> {
29+
let attempt = 1
30+
while (attempt <= retryCount) {
3031
try {
3132
await this.#initConnection(connectionString)
3233
break
3334
} catch (err) {
34-
console.error('RabbitMQ: Failed to connect. Retrying...', err)
35+
console.error(`RabbitMQ: Failed to connect (attempt ${attempt}/${retryCount === Infinity ? '∞' : retryCount}). Retrying...`, err)
3536
if (attempt === retryCount) {
36-
throw new Error('RabbitMQ: Failed to connect after some retries')
37+
throw new Error('RabbitMQ: Failed to connect after maximum retries')
3738
}
38-
// basic exponential backoff capped at 10s
39+
// exponential backoff capped at 10s
3940
const delay = Math.min(1000 * 2 ** (attempt - 1), 10_000)
4041
await new Promise((res) => setTimeout(res, delay))
42+
attempt++
4143
}
4244
}
4345

44-
try {
45-
await this.#declareExchanges()
46-
await this.#declareQueues()
47-
await this.#declareBindings()
48-
} catch (error) {
49-
console.error('RabbitMQ: Failed to declare exchanges, queues, and bindings', error)
50-
}
46+
await this.#declareExchanges()
47+
await this.#declareQueues()
48+
await this.#declareBindings()
5149
}
5250

5351
get publisher(): Publisher {
@@ -121,6 +119,17 @@ export class Repository implements QueueRepository {
121119
}
122120

123121
async #initConnection(connectionString: string): Promise<void> {
122+
// Clean up previous connection if exists
123+
if (this.#connection) {
124+
try {
125+
await this.#connection.close()
126+
} catch {
127+
// Ignore cleanup errors
128+
}
129+
this.#connection = null
130+
this.#publisher = null
131+
}
132+
124133
const connection = new Connection({
125134
url: connectionString,
126135
})

0 commit comments

Comments
 (0)