Skip to content

Commit 07173a5

Browse files
committed
Keep AMQP depth probes passive
Avoid declaring queues or ordering exchanges while reporting AMQP queue depth. Missing queues now count as zero while preserving the existing channel recreation path for passive checks, and delayed queue tracking is iterated without copying all keys first. #748 (comment) #748 (comment) Assisted-by: Codex:gpt-5.5
1 parent 3459af2 commit 07173a5

1 file changed

Lines changed: 22 additions & 15 deletions

File tree

packages/amqp/src/mq.ts

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -398,30 +398,37 @@ export class AmqpMessageQueue implements MessageQueue {
398398
// The channel can already be closed by a failed passive queue check.
399399
}
400400
};
401+
const checkQueue = async (
402+
queueName: string,
403+
): Promise<number | undefined> => {
404+
if (channel == null) channel = await this.#createDepthChannel();
405+
try {
406+
return (await channel.checkQueue(queueName)).messageCount;
407+
} catch (error) {
408+
if (!isQueueNotFoundError(error)) {
409+
throw error;
410+
}
411+
await closeChannel();
412+
channel = await this.#createDepthChannel();
413+
return undefined;
414+
}
415+
};
401416
try {
402-
await this.#prepareQueue(channel);
403-
await this.#prepareOrdering(channel);
404-
405-
let ready = (await channel.checkQueue(this.#queue)).messageCount;
417+
let ready = (await checkQueue(this.#queue)) ?? 0;
406418
if (this.#ordering != null) {
407419
for (let i = 0; i < this.#ordering.partitions; i++) {
408-
ready += (await channel.checkQueue(this.#getOrderingQueueName(i)))
409-
.messageCount;
420+
ready += (await checkQueue(this.#getOrderingQueueName(i))) ?? 0;
410421
}
411422
}
412423

413424
let delayed = 0;
414425
this.#pruneDelayedQueues();
415-
for (const queue of [...this.#delayedQueues.keys()]) {
416-
try {
417-
delayed += (await channel.checkQueue(queue)).messageCount;
418-
} catch (error) {
419-
if (!isQueueNotFoundError(error)) {
420-
throw error;
421-
}
426+
for (const queue of this.#delayedQueues.keys()) {
427+
const messageCount = await checkQueue(queue);
428+
if (messageCount == null) {
422429
this.#delayedQueues.delete(queue);
423-
await closeChannel();
424-
channel = await this.#createDepthChannel();
430+
} else {
431+
delayed += messageCount;
425432
}
426433
}
427434

0 commit comments

Comments
 (0)