Skip to content

Commit 595d357

Browse files
committed
Preserve AMQP delayed depth accuracy
Keep delayed queues tracked when the cleanup threshold is exceeded and prune only queues that RabbitMQ reports as missing. Reuse depth-probe channels per worker so checking many tracked queues no longer opens one channel for every queue in the common case. #748 (comment) #748 (comment) Assisted-by: Codex:gpt-5.5
1 parent b9b41a6 commit 595d357

2 files changed

Lines changed: 83 additions & 31 deletions

File tree

packages/amqp/src/mq.test.ts

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,26 +159,54 @@ unitTest(
159159
},
160160
);
161161

162-
unitTest("AmqpMessageQueue caps delayed queue tracking", async () => {
162+
unitTest("AmqpMessageQueue reuses depth probe channels", async () => {
163163
const conn = new FakeDepthConnection(0);
164164
const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, {
165165
queue: "ready",
166166
delayedQueuePrefix: "delayed_",
167167
});
168168

169-
for (let milliseconds = 1; milliseconds <= 4097; milliseconds++) {
169+
for (let milliseconds = 1; milliseconds <= 12; milliseconds++) {
170170
await mq.enqueue("delayed", {
171171
delay: Temporal.Duration.from({ milliseconds }),
172172
});
173173
}
174+
conn.channelCount = 0;
174175

175176
assertEquals(await mq.getDepth(), {
176-
queued: 4096,
177+
queued: 12,
177178
ready: 0,
178-
delayed: 4096,
179+
delayed: 12,
179180
});
181+
assert(
182+
conn.channelCount <= 9,
183+
`expected at most 9 depth probe channels, got ${conn.channelCount}`,
184+
);
180185
});
181186

187+
unitTest(
188+
"AmqpMessageQueue keeps delayed queues past cleanup threshold",
189+
async () => {
190+
const conn = new FakeDepthConnection(0);
191+
const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, {
192+
queue: "ready",
193+
delayedQueuePrefix: "delayed_",
194+
});
195+
196+
for (let milliseconds = 1; milliseconds <= 4097; milliseconds++) {
197+
await mq.enqueue("delayed", {
198+
delay: Temporal.Duration.from({ milliseconds }),
199+
});
200+
}
201+
202+
assertEquals(await mq.getDepth(), {
203+
queued: 4097,
204+
ready: 0,
205+
delayed: 4097,
206+
});
207+
},
208+
);
209+
182210
unitTest(
183211
"AmqpMessageQueue.getDepth() keeps delayed queues past local expiry",
184212
async () => {

packages/amqp/src/mq.ts

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ function isPreconditionFailedError(error: unknown): boolean {
2020

2121
const depthProbeConcurrency = 8;
2222
const delayedQueueExpiryMargin = 60_000;
23-
const delayedQueueTrackingLimit = 4096;
23+
const delayedQueueCleanupThreshold = 4096;
2424

2525
/**
2626
* Options for ordering key support in {@link AmqpMessageQueue}.
@@ -143,6 +143,7 @@ export class AmqpMessageQueue implements MessageQueue {
143143
partitions: number;
144144
};
145145
#delayedQueues: Set<string> = new Set();
146+
#delayedQueueCleanup?: Promise<void>;
146147
#orderingPrepared: boolean = false;
147148

148149
readonly nativeRetrial: boolean;
@@ -384,10 +385,15 @@ export class AmqpMessageQueue implements MessageQueue {
384385

385386
#trackDelayedQueue(queue: string): void {
386387
this.#delayedQueues.add(queue);
387-
while (this.#delayedQueues.size > delayedQueueTrackingLimit) {
388-
const oldestQueue = this.#delayedQueues.values().next().value;
389-
if (oldestQueue == null) break;
390-
this.#delayedQueues.delete(oldestQueue);
388+
if (
389+
this.#delayedQueues.size > delayedQueueCleanupThreshold &&
390+
this.#delayedQueueCleanup == null
391+
) {
392+
this.#delayedQueueCleanup = this.#pruneMissingDelayedQueues()
393+
.catch(() => undefined)
394+
.finally(() => {
395+
this.#delayedQueueCleanup = undefined;
396+
});
391397
}
392398
}
393399

@@ -429,24 +435,6 @@ export class AmqpMessageQueue implements MessageQueue {
429435
return channel;
430436
}
431437

432-
async #checkQueueDepth(queueName: string): Promise<number | undefined> {
433-
const channel = await this.#createDepthChannel();
434-
try {
435-
return (await channel.checkQueue(queueName)).messageCount;
436-
} catch (error) {
437-
if (!isQueueNotFoundError(error)) {
438-
throw error;
439-
}
440-
return undefined;
441-
} finally {
442-
try {
443-
await channel.close();
444-
} catch {
445-
// The channel can already be closed by a failed passive queue check.
446-
}
447-
}
448-
}
449-
450438
async #checkQueueDepths(
451439
queueNames: readonly string[],
452440
): Promise<readonly (readonly [string, number | undefined])[]> {
@@ -455,10 +443,37 @@ export class AmqpMessageQueue implements MessageQueue {
455443
);
456444
let nextIndex = 0;
457445
const worker = async () => {
458-
while (nextIndex < queueNames.length) {
459-
const index = nextIndex++;
460-
const queue = queueNames[index];
461-
results[index] = [queue, await this.#checkQueueDepth(queue)];
446+
let channel: Channel | undefined;
447+
const closeChannel = async () => {
448+
if (channel == null) return;
449+
const currentChannel = channel;
450+
channel = undefined;
451+
try {
452+
await currentChannel.close();
453+
} catch {
454+
// The channel can already be closed by a failed passive queue check.
455+
}
456+
};
457+
const checkQueue = async (
458+
queue: string,
459+
): Promise<number | undefined> => {
460+
channel ??= await this.#createDepthChannel();
461+
try {
462+
return (await channel.checkQueue(queue)).messageCount;
463+
} catch (error) {
464+
await closeChannel();
465+
if (!isQueueNotFoundError(error)) throw error;
466+
return undefined;
467+
}
468+
};
469+
try {
470+
while (nextIndex < queueNames.length) {
471+
const index = nextIndex++;
472+
const queue = queueNames[index];
473+
results[index] = [queue, await checkQueue(queue)];
474+
}
475+
} finally {
476+
await closeChannel();
462477
}
463478
};
464479
const workers = Array.from(
@@ -469,6 +484,15 @@ export class AmqpMessageQueue implements MessageQueue {
469484
return results;
470485
}
471486

487+
async #pruneMissingDelayedQueues(): Promise<void> {
488+
const delayedQueues = [...this.#delayedQueues];
489+
for (
490+
const [queue, messageCount] of await this.#checkQueueDepths(delayedQueues)
491+
) {
492+
if (messageCount == null) this.#delayedQueues.delete(queue);
493+
}
494+
}
495+
472496
async getDepth(): Promise<MessageQueueDepth> {
473497
const readyQueues = [this.#queue];
474498
if (this.#ordering != null) {

0 commit comments

Comments
 (0)