Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,43 @@ Version 2.3.0

To be released.

### @fedify/fedify

- Added optional `MessageQueue.getDepth()` support, using the new
`MessageQueueDepth` return type, for reporting queue backlog depth.
`InProcessMessageQueue` can now report queued messages, including ready
and delayed counts, and `ParallelMessageQueue` delegates depth reporting
to its wrapped queue when supported. [[#735], [#748]]

[#735]: https://github.com/fedify-dev/fedify/issues/735
[#748]: https://github.com/fedify-dev/fedify/pull/748

### @fedify/amqp

- Added `AmqpMessageQueue.getDepth()` for reporting queued, ready, and
delayed message counts. Delayed counts include queues created or tracked
by the same `AmqpMessageQueue` instance. [[#735], [#748]]

### @fedify/mysql

- Added `MysqlMessageQueue.getDepth()` for reporting queued, ready, and
delayed message counts. [[#735], [#748]]

### @fedify/postgres

- Added `PostgresMessageQueue.getDepth()` for reporting queued, ready, and
delayed message counts. [[#735], [#748]]

### @fedify/redis

- Added `RedisMessageQueue.getDepth()` for reporting queued, ready, and
delayed message counts. [[#735], [#748]]

### @fedify/sqlite

- Added `SqliteMessageQueue.getDepth()` for reporting queued, ready, and
delayed message counts. [[#735], [#748]]


Version 2.2.0
-------------
Expand Down
80 changes: 80 additions & 0 deletions docs/manual/mq.md
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ the `~MessageQueue.enqueue()` and `~MessageQueue.listen()` methods:
~~~~ typescript twoslash
import type {
MessageQueue,
MessageQueueDepth,
MessageQueueEnqueueOptions,
MessageQueueListenOptions,
} from "@fedify/fedify";
Expand All @@ -693,6 +694,11 @@ class CustomMessageQueue implements MessageQueue {
): Promise<void> {
// Implementation here
}

// Optional: implement only if your backend can report real counts.
// async getDepth(): Promise<MessageQueueDepth> {
// return { queued, ready, delayed };
// }
}
~~~~

Expand Down Expand Up @@ -747,6 +753,21 @@ you can set the `nativeRetrial` property to `true` to indicate this.
When this property is `true`, Fedify will skip its own retry logic and rely
on your backend to handle retries, avoiding duplicate retry mechanisms.

### Implement `~MessageQueue.getDepth()` method (optional)

*This API is available since Fedify 2.3.0.*

This optional method should return the number of messages still waiting in the
backend queue. It should not include messages that have already been handed to
a worker for processing. Return `queued` for the total waiting messages. If
your backend can cheaply distinguish scheduled messages, also return `ready`
for messages eligible for immediate processing and `delayed` for messages
scheduled for later delivery.

Implement this method if your queue backend exposes an efficient count
operation. If the platform does not expose reliable counts, omit the method
rather than returning an approximate value that could mislead monitoring.


Parallel message processing
---------------------------
Expand Down Expand Up @@ -951,6 +972,65 @@ Optimized performance
: Backend-specific optimizations for retry logic.


Queue depth reporting
---------------------

*This API is available since Fedify 2.3.0.*

Some message queue implementations expose `~MessageQueue.getDepth()` for
observability. Queue depth means messages still waiting in the backend queue:

`queued`
: Total waiting messages. This excludes messages currently being handled by
a worker.

`ready`
: Waiting messages eligible for immediate processing. This value is omitted
when the backend cannot distinguish ready and delayed messages cheaply.

`delayed`
: Waiting messages scheduled for later delivery. This value is omitted when
the backend cannot distinguish ready and delayed messages cheaply.

For example:

~~~~ typescript twoslash
import type { MessageQueue } from "@fedify/fedify";
declare const queue: MessageQueue;
// ---cut-before---
const depth = await queue.getDepth?.();
if (depth != null) {
console.log("Queued messages:", depth.queued);
}
~~~~

### Implementation support

| Implementation | Queue Depth Support |
| ------------------------ | ----------------------------------------- |
| `InProcessMessageQueue` | `queued`, `ready`, `delayed` |
| [`DenoKvMessageQueue`] | No reliable platform count |
| [`RedisMessageQueue`] | `queued`, `ready`, `delayed` |
| [`PostgresMessageQueue`] | `queued`, `ready`, `delayed` |
| [`MysqlMessageQueue`] | `queued`, `ready`, `delayed` |
| [`AmqpMessageQueue`] | `queued`, `ready`, `delayed`[^amqp-depth] |
| [`SqliteMessageQueue`] | `queued`, `ready`, `delayed` |
| `WorkersMessageQueue` | No reliable platform count |
Comment thread
coderabbitai[bot] marked this conversation as resolved.
| `ParallelMessageQueue` | Same as wrapped queue |

If you pass the same `MessageQueue` instance as the shared queue for inbox,
outbox, and fanout work, observability code should report that queue once as a
shared queue. Reporting the same `getDepth()` result separately for each
logical role would double- or triple-count the backlog.

[^amqp-depth]: `AmqpMessageQueue` can count the configured ready queues and
delayed queues created by the same `AmqpMessageQueue` instance.
AMQP 0-9-1 does not provide a portable queue-listing API, so
delayed queues created by another process before this instance
starts are not included until this instance creates or tracks
them.


Ordering guarantees
-------------------

Expand Down
44 changes: 44 additions & 0 deletions packages/amqp/src/mq.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { suite } from "@alinea/suite";
import { AmqpMessageQueue } from "@fedify/amqp/mq";
import { getRandomKey, testMessageQueue, waitFor } from "@fedify/testing";
import * as temporal from "@js-temporal/polyfill";
import { assert, assertEquals, assertFalse, assertGreater } from "@std/assert";
import { delay } from "@std/async/delay";
// @deno-types="npm:@types/amqplib"
import { type ChannelModel, connect } from "amqplib";
import process from "node:process";

const Temporal = globalThis.Temporal ?? temporal.Temporal;

const AMQP_URL = process.env.AMQP_URL;
const test = AMQP_URL ? suite(import.meta) : suite(import.meta).skip;

Expand Down Expand Up @@ -37,6 +40,47 @@ test(
),
);

test(
"AmqpMessageQueue.getDepth()",
{ sanitizeOps: false, sanitizeExit: false, sanitizeResources: false },
async () => {
const conn = await getConnection();
const queue = getRandomKey("depth_queue");
const delayedQueuePrefix = getRandomKey("depth_delayed") + "_";
const mq = new AmqpMessageQueue(conn, { queue, delayedQueuePrefix });
try {
assertEquals(await mq.getDepth(), {
queued: 0,
ready: 0,
delayed: 0,
});
await mq.enqueue("ready");
await mq.enqueue("delayed", {
delay: Temporal.Duration.from({ seconds: 60 }),
});
const started = Date.now();
while (Date.now() - started < 15_000) {
const depth = await mq.getDepth();
if (depth.queued === 2 && depth.ready === 1 && depth.delayed === 1) {
break;
}
await delay(100);
}
assertEquals(await mq.getDepth(), {
queued: 2,
ready: 1,
delayed: 1,
});
} finally {
const channel = await conn.createChannel();
await channel.deleteQueue(queue);
await channel.deleteQueue(`${delayedQueuePrefix}60000`).catch(() => {});
await channel.close();
await conn.close();
}
},
);
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Test with ordering key support (requires rabbitmq_consistent_hash_exchange plugin)
const orderingConnections: ChannelModel[] = [];
const orderingQueue = getRandomKey("ordering_queue");
Expand Down
57 changes: 57 additions & 0 deletions packages/amqp/src/mq.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import type {
MessageQueue,
MessageQueueDepth,
MessageQueueEnqueueOptions,
MessageQueueListenOptions,
} from "@fedify/fedify";
// @deno-types="npm:@types/amqplib@^0.10.7"
import type { Channel, ChannelModel, ConsumeMessage } from "amqplib";
import { Buffer } from "node:buffer";

function isQueueNotFoundError(error: unknown): boolean {
return typeof error === "object" && error != null &&
"code" in error && error.code === 404;
}

/**
* Options for ordering key support in {@link AmqpMessageQueue}.
*
Expand Down Expand Up @@ -127,6 +133,7 @@ export class AmqpMessageQueue implements MessageQueue {
queuePrefix: string;
partitions: number;
};
#delayedQueues: Set<string> = new Set();
Comment thread
dahlia marked this conversation as resolved.
#orderingPrepared: boolean = false;

readonly nativeRetrial: boolean;
Expand Down Expand Up @@ -263,6 +270,7 @@ export class AmqpMessageQueue implements MessageQueue {
deadLetterRoutingKey,
messageTtl: delay,
});
Comment thread
dahlia marked this conversation as resolved.
this.#delayedQueues.add(queue);
Comment thread
dahlia marked this conversation as resolved.
Outdated
}
channel.sendToQueue(
queue,
Expand Down Expand Up @@ -345,6 +353,7 @@ export class AmqpMessageQueue implements MessageQueue {
deadLetterRoutingKey,
messageTtl: delay,
});
this.#delayedQueues.add(queue);
}

for (const message of messages) {
Expand All @@ -359,6 +368,54 @@ export class AmqpMessageQueue implements MessageQueue {
}
}

async getDepth(): Promise<MessageQueueDepth> {
let channel: Channel | undefined = await this.#connection.createChannel();
const closeChannel = async () => {
if (channel == null) return;
const currentChannel = channel;
channel = undefined;
try {
await currentChannel.close();
} catch {
// The channel can already be closed by a failed passive queue check.
}
};
try {
await this.#prepareQueue(channel);
await this.#prepareOrdering(channel);

let ready = (await channel.checkQueue(this.#queue)).messageCount;
if (this.#ordering != null) {
for (let i = 0; i < this.#ordering.partitions; i++) {
ready += (await channel.checkQueue(this.#getOrderingQueueName(i)))
.messageCount;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
}

let delayed = 0;
for (const queue of [...this.#delayedQueues]) {
try {
delayed += (await channel.checkQueue(queue)).messageCount;
} catch (error) {
if (!isQueueNotFoundError(error)) {
throw error;
}
this.#delayedQueues.delete(queue);
await closeChannel();
channel = await this.#connection.createChannel();
}
Comment thread
dahlia marked this conversation as resolved.
Comment thread
dahlia marked this conversation as resolved.
Outdated
}

return {
queued: ready + delayed,
ready,
delayed,
};
} finally {
await closeChannel();
}
}

async listen(
// deno-lint-ignore no-explicit-any
handler: (message: any) => void | Promise<void>,
Expand Down
Loading
Loading