Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type {
TransactionObservabilityManager,
} from '@message-queue-toolkit/core'
import { HandlerContainer, isMessageError, parseMessage } from '@message-queue-toolkit/core'
import type { Connection, Message } from 'amqplib'
import type { ChannelModel, Message } from 'amqplib'

import type {
AMQPConsumerDependencies,
Expand Down Expand Up @@ -123,7 +123,7 @@ export abstract class AbstractAmqpConsumer<
await super.init()
}

async receiveNewConnection(connection: Connection): Promise<void> {
async receiveNewConnection(connection: ChannelModel): Promise<void> {
await super.receiveNewConnection(connection)
await this.consume()
}
Expand Down
6 changes: 3 additions & 3 deletions packages/amqp/lib/AbstractAmqpService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type {
QueueOptions,
} from '@message-queue-toolkit/core'
import { AbstractQueueService } from '@message-queue-toolkit/core'
import type { Channel, Connection, Message } from 'amqplib'
import type { Channel, ChannelModel, Message } from 'amqplib'
import type { Options } from 'amqplib/properties'

import type { AmqpConnectionManager, ConnectionReceiver } from './AmqpConnectionManager'
Expand Down Expand Up @@ -60,7 +60,7 @@ export abstract class AbstractAmqpService<
>
implements ConnectionReceiver
{
protected connection?: Connection
protected connection?: ChannelModel
private connectionManager: AmqpConnectionManager
// @ts-ignore
protected channel: Channel
Expand All @@ -78,7 +78,7 @@ export abstract class AbstractAmqpService<
this.connectionManager.subscribeConnectionReceiver(this)
}

async receiveNewConnection(connection: Connection) {
async receiveNewConnection(connection: ChannelModel) {
this.connection = connection

this.isShuttingDown = false
Expand Down
10 changes: 5 additions & 5 deletions packages/amqp/lib/AmqpConnectionManager.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import type { Connection } from 'amqplib'
import type { ChannelModel } from 'amqplib'

import type { CommonLogger } from '@lokalise/node-core'
import type { AmqpConfig } from './amqpConnectionResolver'
import { resolveAmqpConnection } from './amqpConnectionResolver'

export type ConnectionReceiver = {
receiveNewConnection(connection: Connection): Promise<void>
receiveNewConnection(connection: ChannelModel): Promise<void>
close(): Promise<void>
}

export class AmqpConnectionManager {
private readonly config: AmqpConfig
private readonly logger: CommonLogger
private readonly connectionReceivers: ConnectionReceiver[]
private connection?: Connection
private connection?: ChannelModel
public reconnectsActive: boolean

public isReconnecting: boolean
Expand All @@ -26,7 +26,7 @@ export class AmqpConnectionManager {
this.logger = logger
}

private async createConnection() {
private async createConnection(): Promise<ChannelModel> {
const connection = await resolveAmqpConnection(this.config)
connection.on('error', (err) => {
this.logger.error(`AmqpConnectionManager: Connection error: ${err.message}`)
Expand Down Expand Up @@ -61,7 +61,7 @@ export class AmqpConnectionManager {
return this.connection
}

async getConnection(): Promise<Connection> {
async getConnection(): Promise<ChannelModel> {
if (!this.connection) {
this.connection = await this.createConnection()
}
Expand Down
4 changes: 2 additions & 2 deletions packages/amqp/lib/amqpConnectionResolver.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { setTimeout } from 'node:timers/promises'

import { globalLogger } from '@lokalise/node-core'
import { connect } from 'amqplib'
import { type ChannelModel, connect } from 'amqplib'

const CONNECT_RETRY_SECONDS = 10
const MAX_RETRY_ATTEMPTS = 10
Expand All @@ -15,7 +15,7 @@ export type AmqpConfig = {
useTls: boolean
}

export async function resolveAmqpConnection(config: AmqpConfig) {
export async function resolveAmqpConnection(config: AmqpConfig): Promise<ChannelModel> {
const protocol = config.useTls ? 'amqps' : 'amqp'
let counter = 0
while (true) {
Expand Down
12 changes: 6 additions & 6 deletions packages/amqp/lib/utils/amqpQueueUtils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { DeletionConfig } from '@message-queue-toolkit/core'
import { isProduction } from '@message-queue-toolkit/core'
import type { Channel, Connection } from 'amqplib'
import type { Channel, ChannelModel } from 'amqplib'

import type {
AMQPQueueCreationConfig,
Expand All @@ -11,7 +11,7 @@ import type {
} from '../AbstractAmqpService'

export async function checkQueueExists(
connection: Connection,
connection: ChannelModel,
locatorConfig: AMQPQueueLocator,
): Promise<void> {
// queue check breaks channel if not successful
Expand All @@ -28,7 +28,7 @@ export async function checkQueueExists(
}

export async function checkExchangeExists(
connection: Connection,
connection: ChannelModel,
locatorConfig: AMQPTopicPublisherConfig,
): Promise<void> {
// exchange check breaks channel if not successful
Expand All @@ -45,7 +45,7 @@ export async function checkExchangeExists(
}

export async function ensureAmqpQueue(
connection: Connection,
connection: ChannelModel,
channel: Channel,
creationConfig?: AMQPQueueCreationConfig,
locatorConfig?: AMQPQueueLocator,
Expand All @@ -61,7 +61,7 @@ export async function ensureAmqpQueue(
}

export async function ensureAmqpTopicSubscription(
connection: Connection,
connection: ChannelModel,
channel: Channel,
creationConfig?: AMQPTopicCreationConfig,
locatorConfig?: AMQPTopicLocator,
Expand All @@ -84,7 +84,7 @@ export async function ensureAmqpTopicSubscription(
}

export async function ensureExchange(
connection: Connection,
connection: ChannelModel,
channel: Channel,
creationConfig?: AMQPTopicPublisherConfig,
locatorConfig?: AMQPTopicPublisherConfig,
Expand Down
20 changes: 10 additions & 10 deletions packages/amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/node-core": "^13.3.0",
"zod": "^3.23.8"
"@lokalise/node-core": "^13.6.0",
"zod": "^3.24.2"
},
"peerDependencies": {
"@message-queue-toolkit/core": ">=20.0.0",
"@message-queue-toolkit/schemas": ">=2.0.0",
"amqplib": "^0.10.3"
"amqplib": "^0.10.5"
},
"devDependencies": {
"@biomejs/biome": "1.9.4",
"@kibertoad/biome-config": "^1.2.1",
"@message-queue-toolkit/core": "*",
"@types/amqplib": "0.10.6",
"@types/amqplib": "0.10.7",
"@types/node": "^22.7.5",
"@vitest/coverage-v8": "^3.0.7",
"amqplib": "^0.10.4",
"awilix": "^12.0.1",
"awilix-manager": "^6.0.0",
"@vitest/coverage-v8": "^3.0.9",
"amqplib": "^0.10.5",
"awilix": "^12.0.5",
"awilix-manager": "^6.1.0",
"del-cli": "^6.0.0",
"typescript": "^5.7.2",
"vitest": "^3.0.7"
"typescript": "^5.8.2",
"vitest": "^3.0.9"
},
"homepage": "https://github.com/kibertoad/message-queue-toolkit",
"repository": {
Expand Down
Loading