Skip to content

Commit c094e49

Browse files
authored
Merge pull request #669 from dahlia/docfix
Fix and clarify Cloudflare Workers documentation
2 parents 4104661 + 85fc4b8 commit c094e49

File tree

3 files changed

+77
-9
lines changed

3 files changed

+77
-9
lines changed

docs/manual/deploy.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ on your infrastructure:
163163

164164
Development
165165
: Use [`MemoryKvStore`](./kv.md#memorykvstore) and
166-
[`InProcessMessageQueue`](./mq.md#inprocessmessagequeu) for quick setup.
166+
[`InProcessMessageQueue`](./mq.md#inprocessmessagequeue) for quick setup.
167167

168168
Production
169169
: Consider [`PostgresKvStore`](./kv.md#postgreskvstore) and
@@ -291,8 +291,8 @@ export default {
291291
for (const message of batch.messages) {
292292
try {
293293
await federation.processQueuedTask(
294-
message.body as unknown as Message,
295294
env,
295+
message.body as unknown as Message,
296296
);
297297
message.ack();
298298
} catch (error) {
@@ -303,6 +303,13 @@ export default {
303303
};
304304
~~~~
305305

306+
If you use queue ordering keys on Cloudflare Workers, instantiate
307+
`WorkersMessageQueue` with an `orderingKv` namespace and call
308+
`WorkersMessageQueue.processMessage()` before
309+
`Federation.processQueuedTask()`. See the
310+
[*`WorkersMessageQueue`* section](./mq.md#workersmessagequeue-cloudflare-workers-only)
311+
for a complete example and caveats about best-effort ordering.
312+
306313
### Example deployment
307314

308315
For a complete working example, see the [Cloudflare Workers example]

docs/manual/mq.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,56 @@ export default {
604604
> process the messages. The `queue()` method is the only way to consume
605605
> messages from the queue in Cloudflare Workers.
606606
607+
> [!NOTE]
608+
> If you use `~MessageQueueEnqueueOptions.orderingKey` with
609+
> `WorkersMessageQueue`, you also need to provide a KV namespace for ordering
610+
> locks and pass each raw queue message through
611+
> `~WorkersMessageQueue.processMessage()` before calling
612+
> `Federation.processQueuedTask()`. Otherwise, the ordering key is embedded in
613+
> the message, but not enforced when the worker consumes it.
614+
>
615+
> ~~~~ typescript
616+
> import { createFederationBuilder, type Message } from "@fedify/fedify";
617+
> import { WorkersKvStore, WorkersMessageQueue } from "@fedify/cfworkers";
618+
>
619+
> type Env = {
620+
> KV_NAMESPACE: KVNamespace<string>;
621+
> QUEUE_BINDING: Queue;
622+
> ORDERING_KV: KVNamespace<string>;
623+
> };
624+
>
625+
> const builder = createFederationBuilder<Env>();
626+
>
627+
> export default {
628+
> async queue(batch: MessageBatch<unknown>, env: Env): Promise<void> {
629+
> const queue = new WorkersMessageQueue(env.QUEUE_BINDING, {
630+
> orderingKv: env.ORDERING_KV,
631+
> });
632+
> const federation = await builder.build({
633+
> kv: new WorkersKvStore(env.KV_NAMESPACE),
634+
> queue,
635+
> });
636+
>
637+
> for (const message of batch.messages) {
638+
> const result = await queue.processMessage(message.body);
639+
> if (!result.shouldProcess) {
640+
> message.retry();
641+
> continue;
642+
> }
643+
> try {
644+
> await federation.processQueuedTask(
645+
> env,
646+
> result.message as Message,
647+
> );
648+
> message.ack();
649+
> } finally {
650+
> await result.release?.();
651+
> }
652+
> }
653+
> },
654+
> };
655+
> ~~~~
656+
607657
[Cloudflare Workers]: https://workers.cloudflare.com/
608658
[Cloudflare Queues]: https://developers.cloudflare.com/queues/
609659

packages/cfworkers/README.md

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ implementations for [Cloudflare Workers]:
1616
- [`WorkersMessageQueue`]
1717

1818
~~~~ typescript
19-
import type { Federation } from "@fedify/fedify";
19+
import { createFederation, type Message } from "@fedify/fedify";
2020
import { WorkersKvStore, WorkersMessageQueue } from "@fedify/cfworkers";
2121

2222
export default {
@@ -26,22 +26,25 @@ export default {
2626
queue: new WorkersMessageQueue(env.QUEUE_BINDING),
2727
// ... other options
2828
});
29-
30-
return federation.handle(request, { contextData: env });
29+
30+
return federation.fetch(request, { contextData: env });
3131
},
32-
32+
3333
async queue(batch, env, ctx) {
3434
const federation = createFederation({
3535
kv: new WorkersKvStore(env.KV_BINDING),
3636
queue: new WorkersMessageQueue(env.QUEUE_BINDING),
3737
// ... other options
3838
});
39-
39+
4040
for (const message of batch.messages) {
41-
await federation.processQueuedTask(message.body);
41+
await federation.processQueuedTask(
42+
env,
43+
message.body as unknown as Message,
44+
);
4245
}
4346
}
44-
} satisfies ExportedHandler<{
47+
} satisfies ExportedHandler<{
4548
KV_BINDING: KVNamespace<string>;
4649
QUEUE_BINDING: Queue;
4750
}>;
@@ -101,6 +104,14 @@ management.
101104
> process the messages. The `queue()` method is the only way to consume
102105
> messages from the queue in Cloudflare Workers.
103106
107+
> [!NOTE]
108+
> If you use `orderingKey` when enqueueing messages, construct
109+
> `WorkersMessageQueue` with an `orderingKv` namespace and pass each raw queue
110+
> message through `WorkersMessageQueue.processMessage()` before calling
111+
> `Federation.processQueuedTask()`. This acquires and releases the best-effort
112+
> ordering lock for that key. You can also customize the lock behavior with
113+
> the `orderingKeyPrefix` and `orderingLockTtl` options.
114+
104115
[Cloudflare Queues]: https://developers.cloudflare.com/queues/
105116

106117

0 commit comments

Comments
 (0)