Skip to content

Commit 36538f9

Browse files
authored
Adjust to new AMQP types (#254)
1 parent 308ebc6 commit 36538f9

6 files changed

Lines changed: 28 additions & 28 deletions

File tree

packages/amqp/lib/AbstractAmqpConsumer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import type {
1111
TransactionObservabilityManager,
1212
} from '@message-queue-toolkit/core'
1313
import { HandlerContainer, isMessageError, parseMessage } from '@message-queue-toolkit/core'
14-
import type { Connection, Message } from 'amqplib'
14+
import type { ChannelModel, Message } from 'amqplib'
1515

1616
import type {
1717
AMQPConsumerDependencies,
@@ -123,7 +123,7 @@ export abstract class AbstractAmqpConsumer<
123123
await super.init()
124124
}
125125

126-
async receiveNewConnection(connection: Connection): Promise<void> {
126+
async receiveNewConnection(connection: ChannelModel): Promise<void> {
127127
await super.receiveNewConnection(connection)
128128
await this.consume()
129129
}

packages/amqp/lib/AbstractAmqpService.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type {
55
QueueOptions,
66
} from '@message-queue-toolkit/core'
77
import { AbstractQueueService } from '@message-queue-toolkit/core'
8-
import type { Channel, Connection, Message } from 'amqplib'
8+
import type { Channel, ChannelModel, Message } from 'amqplib'
99
import type { Options } from 'amqplib/properties'
1010

1111
import type { AmqpConnectionManager, ConnectionReceiver } from './AmqpConnectionManager'
@@ -60,7 +60,7 @@ export abstract class AbstractAmqpService<
6060
>
6161
implements ConnectionReceiver
6262
{
63-
protected connection?: Connection
63+
protected connection?: ChannelModel
6464
private connectionManager: AmqpConnectionManager
6565
// @ts-ignore
6666
protected channel: Channel
@@ -78,7 +78,7 @@ export abstract class AbstractAmqpService<
7878
this.connectionManager.subscribeConnectionReceiver(this)
7979
}
8080

81-
async receiveNewConnection(connection: Connection) {
81+
async receiveNewConnection(connection: ChannelModel) {
8282
this.connection = connection
8383

8484
this.isShuttingDown = false

packages/amqp/lib/AmqpConnectionManager.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
1-
import type { Connection } from 'amqplib'
1+
import type { ChannelModel } from 'amqplib'
22

33
import type { CommonLogger } from '@lokalise/node-core'
44
import type { AmqpConfig } from './amqpConnectionResolver'
55
import { resolveAmqpConnection } from './amqpConnectionResolver'
66

77
export type ConnectionReceiver = {
8-
receiveNewConnection(connection: Connection): Promise<void>
8+
receiveNewConnection(connection: ChannelModel): Promise<void>
99
close(): Promise<void>
1010
}
1111

1212
export class AmqpConnectionManager {
1313
private readonly config: AmqpConfig
1414
private readonly logger: CommonLogger
1515
private readonly connectionReceivers: ConnectionReceiver[]
16-
private connection?: Connection
16+
private connection?: ChannelModel
1717
public reconnectsActive: boolean
1818

1919
public isReconnecting: boolean
@@ -26,7 +26,7 @@ export class AmqpConnectionManager {
2626
this.logger = logger
2727
}
2828

29-
private async createConnection() {
29+
private async createConnection(): Promise<ChannelModel> {
3030
const connection = await resolveAmqpConnection(this.config)
3131
connection.on('error', (err) => {
3232
this.logger.error(`AmqpConnectionManager: Connection error: ${err.message}`)
@@ -61,7 +61,7 @@ export class AmqpConnectionManager {
6161
return this.connection
6262
}
6363

64-
async getConnection(): Promise<Connection> {
64+
async getConnection(): Promise<ChannelModel> {
6565
if (!this.connection) {
6666
this.connection = await this.createConnection()
6767
}

packages/amqp/lib/amqpConnectionResolver.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { setTimeout } from 'node:timers/promises'
22

33
import { globalLogger } from '@lokalise/node-core'
4-
import { connect } from 'amqplib'
4+
import { type ChannelModel, connect } from 'amqplib'
55

66
const CONNECT_RETRY_SECONDS = 10
77
const MAX_RETRY_ATTEMPTS = 10
@@ -15,7 +15,7 @@ export type AmqpConfig = {
1515
useTls: boolean
1616
}
1717

18-
export async function resolveAmqpConnection(config: AmqpConfig) {
18+
export async function resolveAmqpConnection(config: AmqpConfig): Promise<ChannelModel> {
1919
const protocol = config.useTls ? 'amqps' : 'amqp'
2020
let counter = 0
2121
while (true) {

packages/amqp/lib/utils/amqpQueueUtils.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { DeletionConfig } from '@message-queue-toolkit/core'
22
import { isProduction } from '@message-queue-toolkit/core'
3-
import type { Channel, Connection } from 'amqplib'
3+
import type { Channel, ChannelModel } from 'amqplib'
44

55
import type {
66
AMQPQueueCreationConfig,
@@ -11,7 +11,7 @@ import type {
1111
} from '../AbstractAmqpService'
1212

1313
export async function checkQueueExists(
14-
connection: Connection,
14+
connection: ChannelModel,
1515
locatorConfig: AMQPQueueLocator,
1616
): Promise<void> {
1717
// queue check breaks channel if not successful
@@ -28,7 +28,7 @@ export async function checkQueueExists(
2828
}
2929

3030
export async function checkExchangeExists(
31-
connection: Connection,
31+
connection: ChannelModel,
3232
locatorConfig: AMQPTopicPublisherConfig,
3333
): Promise<void> {
3434
// exchange check breaks channel if not successful
@@ -45,7 +45,7 @@ export async function checkExchangeExists(
4545
}
4646

4747
export async function ensureAmqpQueue(
48-
connection: Connection,
48+
connection: ChannelModel,
4949
channel: Channel,
5050
creationConfig?: AMQPQueueCreationConfig,
5151
locatorConfig?: AMQPQueueLocator,
@@ -61,7 +61,7 @@ export async function ensureAmqpQueue(
6161
}
6262

6363
export async function ensureAmqpTopicSubscription(
64-
connection: Connection,
64+
connection: ChannelModel,
6565
channel: Channel,
6666
creationConfig?: AMQPTopicCreationConfig,
6767
locatorConfig?: AMQPTopicLocator,
@@ -84,7 +84,7 @@ export async function ensureAmqpTopicSubscription(
8484
}
8585

8686
export async function ensureExchange(
87-
connection: Connection,
87+
connection: ChannelModel,
8888
channel: Channel,
8989
creationConfig?: AMQPTopicPublisherConfig,
9090
locatorConfig?: AMQPTopicPublisherConfig,

packages/amqp/package.json

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,27 +25,27 @@
2525
"prepublishOnly": "npm run build:release"
2626
},
2727
"dependencies": {
28-
"@lokalise/node-core": "^13.3.0",
29-
"zod": "^3.23.8"
28+
"@lokalise/node-core": "^13.6.0",
29+
"zod": "^3.24.2"
3030
},
3131
"peerDependencies": {
3232
"@message-queue-toolkit/core": ">=20.0.0",
3333
"@message-queue-toolkit/schemas": ">=2.0.0",
34-
"amqplib": "^0.10.3"
34+
"amqplib": "^0.10.5"
3535
},
3636
"devDependencies": {
3737
"@biomejs/biome": "1.9.4",
3838
"@kibertoad/biome-config": "^1.2.1",
3939
"@message-queue-toolkit/core": "*",
40-
"@types/amqplib": "0.10.6",
40+
"@types/amqplib": "0.10.7",
4141
"@types/node": "^22.7.5",
42-
"@vitest/coverage-v8": "^3.0.7",
43-
"amqplib": "^0.10.4",
44-
"awilix": "^12.0.1",
45-
"awilix-manager": "^6.0.0",
42+
"@vitest/coverage-v8": "^3.0.9",
43+
"amqplib": "^0.10.5",
44+
"awilix": "^12.0.5",
45+
"awilix-manager": "^6.1.0",
4646
"del-cli": "^6.0.0",
47-
"typescript": "^5.7.2",
48-
"vitest": "^3.0.7"
47+
"typescript": "^5.8.2",
48+
"vitest": "^3.0.9"
4949
},
5050
"homepage": "https://github.com/kibertoad/message-queue-toolkit",
5151
"repository": {

0 commit comments

Comments
 (0)