Skip to content

Commit 9e47b55

Browse files
committed
WorkersMessageQueue class
1 parent 79f9f31 commit 9e47b55

5 files changed

Lines changed: 134 additions & 6 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ To be released. Note that 1.6.0 was skipped due to a mistake in the versioning.
4444
- Added `Federation.processQueuedTask()` method. [[#242]]
4545
- Added `Message` type. [[#242]]
4646
- Added `WorkersKvStore` class. [[#241], [#242]]
47+
- Added `WorkersMessageQueue` class. [[#241], [#242]]
4748

4849
- The minimum supported version of Node.js is now 22.0.0.
4950

fedify/cfworkers/client.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ const mf = new Miniflare({
1212
{ type: "ESModule", path: join(import.meta.dirname ?? ".", "server.js") },
1313
],
1414
kvNamespaces: ["KV1", "KV2", "KV3"],
15+
queueProducers: ["Q1"],
16+
queueConsumers: { Q1: { maxBatchSize: 1 } },
1517
async outboundService(request: Request) {
1618
const url = new URL(request.url);
1719
if (url.hostname.endsWith(".test")) {

fedify/cfworkers/server.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ interface TestDefinition {
2121
// @ts-ignore: testDefinitions is untyped
2222
const tests: TestDefinition[] = testDefinitions;
2323
const logs: LogRecord[] = [];
24+
const messageBatches: MessageBatch[] = [];
2425

2526
await configure({
2627
sinks: {
@@ -98,7 +99,7 @@ export default {
9899
}
99100
logs.splice(0, logs.length); // Clear logs
100101
try {
101-
await fn({ name, origin: "", step, env });
102+
await fn({ name, origin: "", step, env, messageBatches });
102103
} catch (e) {
103104
failed ??= e;
104105
}
@@ -130,4 +131,11 @@ export default {
130131
},
131132
);
132133
},
134+
async queue(
135+
batch: MessageBatch,
136+
env: unknown,
137+
ctx: ExecutionContext
138+
): Promise<void> {
139+
messageBatches.push(batch);
140+
}
133141
};

fedify/x/cfworkers.test.ts

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import type { KVNamespace } from "@cloudflare/workers-types/experimental";
2+
import { delay } from "@es-toolkit/es-toolkit";
23
import { assertEquals } from "@std/assert";
34
import { test } from "../testing/mod.ts";
4-
import { WorkersKvStore } from "./cfworkers.ts";
5+
import { WorkersKvStore, WorkersMessageQueue } from "./cfworkers.ts";
56

67
test({
78
name: "WorkersKvStore",
@@ -30,3 +31,47 @@ test({
3031
});
3132
},
3233
});
34+
35+
test({
36+
name: "WorkersMessageQueue",
37+
ignore: !("navigator" in globalThis &&
38+
navigator.userAgent === "Cloudflare-Workers"),
39+
async fn(t) {
40+
const { env, messageBatches } = t as unknown as {
41+
env: Record<string, Queue>;
42+
messageBatches: MessageBatch[];
43+
};
44+
const queue = new WorkersMessageQueue(env.Q1);
45+
await queue.enqueue({ foo: 1, bar: 2 });
46+
await waitFor(() => messageBatches.length > 0, 5000);
47+
assertEquals(messageBatches.length, 1);
48+
assertEquals(messageBatches[0].queue, "Q1");
49+
assertEquals(messageBatches[0].messages.length, 1);
50+
assertEquals(messageBatches[0].messages[0].body, { foo: 1, bar: 2 });
51+
52+
await queue.enqueue(
53+
{ baz: 3, qux: 4 },
54+
{ delay: Temporal.Duration.from({ seconds: 3 }) },
55+
);
56+
await delay(2000);
57+
assertEquals(messageBatches.length, 1);
58+
await waitFor(() => messageBatches.length > 1, 6000);
59+
assertEquals(messageBatches[1].queue, "Q1");
60+
assertEquals(messageBatches[1].messages.length, 1);
61+
assertEquals(messageBatches[1].messages[0].body, { baz: 3, qux: 4 });
62+
},
63+
});
64+
65+
async function waitFor(
66+
predicate: () => boolean,
67+
timeoutMs: number,
68+
): Promise<void> {
69+
let delayed = 0;
70+
while (!predicate()) {
71+
await delay(500);
72+
delayed += 500;
73+
if (delayed > timeoutMs) {
74+
throw new Error("Timeout");
75+
}
76+
}
77+
}

fedify/x/cfworkers.ts

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,33 @@
1-
import type { KVNamespace } from "@cloudflare/workers-types/experimental";
1+
/**
2+
* `KvStore` & `MessageQueue` adapters for Cloudflare Workers
3+
* ==========================================================
4+
*
5+
* This module provides `KvStore` and `MessageQueue` implementations that use
6+
* Cloudflare Workers' KV and Queues bindings, respectively.
7+
*
8+
* @module
9+
* @since 1.6.0
10+
*/
11+
import type {
12+
KVNamespace,
13+
MessageSendRequest,
14+
Queue,
15+
} from "@cloudflare/workers-types/experimental";
216
import type { KvKey, KvStore, KvStoreSetOptions } from "../federation/kv.ts";
17+
import type {
18+
MessageQueue,
19+
MessageQueueEnqueueOptions,
20+
MessageQueueListenOptions,
21+
} from "../federation/mq.ts";
322

423
interface KvMetadata {
524
expires?: number;
625
}
726

827
/**
9-
* Implementation of the KvStore interface for Cloudflare Workers KV binding.
10-
* This class provides a wrapper around Cloudflare's KV namespace to store and
11-
* retrieve JSON-serializable values using structured keys.
28+
* Implementation of the {@link KvStore} interface for Cloudflare Workers KV
29+
* binding. This class provides a wrapper around Cloudflare's KV namespace to
30+
* store and retrieve JSON-serializable values using structured keys.
1231
* @since 1.6.0
1332
*/
1433
export class WorkersKvStore implements KvStore {
@@ -58,3 +77,56 @@ export class WorkersKvStore implements KvStore {
5877
return this.#namespace.delete(this.#encodeKey(key));
5978
}
6079
}
80+
81+
/**
82+
* Implementation of the {@link MessageQueue} interface for Cloudflare
83+
* Workers Queues binding. This class provides a wrapper around Cloudflare's
84+
* Queues to send messages to a queue.
85+
*
86+
* Note that this implementation does not support the `listen()` method,
87+
* as Cloudflare Workers Queues do not support message consumption in the same
88+
* way as other message queue systems. Instead, you should use
89+
* the {@link Federation.processQueuedTask} method to process messages
90+
* passed to the queue.
91+
* @since 1.6.0
92+
*/
93+
export class WorkersMessageQueue implements MessageQueue {
94+
#queue: Queue;
95+
96+
constructor(queue: Queue) {
97+
this.#queue = queue;
98+
}
99+
100+
// deno-lint-ignore no-explicit-any
101+
enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise<void> {
102+
return this.#queue.send(message, {
103+
contentType: "json",
104+
delaySeconds: options?.delay?.total("seconds") ?? 0,
105+
});
106+
}
107+
108+
enqueueMany(
109+
// deno-lint-ignore no-explicit-any
110+
messages: any[],
111+
options?: MessageQueueEnqueueOptions,
112+
): Promise<void> {
113+
const requests: MessageSendRequest[] = messages.map((msg) => ({
114+
body: msg,
115+
contentType: "json",
116+
}));
117+
return this.#queue.sendBatch(requests, {
118+
delaySeconds: options?.delay?.total("seconds") ?? 0,
119+
});
120+
}
121+
122+
listen(
123+
// deno-lint-ignore no-explicit-any
124+
_handler: (message: any) => Promise<void> | void,
125+
_options?: MessageQueueListenOptions,
126+
): Promise<void> {
127+
throw new TypeError(
128+
"WorkersMessageQueue does not support listen(). " +
129+
"Use Federation.processQueuedTask() method instead.",
130+
);
131+
}
132+
}

0 commit comments

Comments
 (0)