@@ -213,6 +213,78 @@ const federation = createFederation({
213213[ @fedify/amqp ] : https://github.com/fedify-dev/amqp
214214[ RabbitMQ ] : https://www.rabbitmq.com/
215215
216+ ### ` WorkersMessageQueue ` (Cloudflare Workers only)
217+
218+ * This API is available since Fedify 1.6.0.*
219+
220+ ` WorkersMessageQueue ` is a message queue implementation for [ Cloudflare Workers]
221+ that uses Cloudflare's built-in [ Cloudflare Queues] API. It provides
222+ scalability and high performance, making it suitable for production use in
223+ Cloudflare Workers environments. It requires a Cloudflare Queues setup and
224+ management.
225+
226+ Best for
227+ : Production use in Cloudflare Workers environments.
228+
229+ Pros
230+ : Persistent, reliable, scalable, easy to set up.
231+
232+ Cons
233+ : Only available in Cloudflare Workers runtime.
234+
235+ ~~~~ typescript twoslash
236+ // @noErrors: 2322 2345
237+ import type { FederationBuilder , KvStore } from " @fedify/fedify" ;
238+ const builder = undefined as unknown as FederationBuilder <void >;
239+ // ---cut-before---
240+ import type { Federation , Message } from " @fedify/fedify" ;
241+ import { WorkersMessageQueue } from " @fedify/fedify/x/cfworkers" ;
242+
243+ export default {
244+ async fetch(request , env , ctx ) {
245+ const federation: Federation <void > = await builder .build ({
246+ // ---cut-start---
247+ kv: undefined as unknown as KvStore ,
248+ // ---cut-end---
249+ queue: new WorkersMessageQueue (env .QUEUE_BINDING ),
250+ });
251+ // Omit the rest of the code for brevity
252+ },
253+
254+ // Since defining a `queue()` method is the only way to consume messages
255+ // from the queue in Cloudflare Workers, we need to define it so that
256+ // the messages can be manually processed by `Federation.processQueuedTask()`
257+ // method:
258+ async queue(batch , env , ctx ) {
259+ const federation: Federation <void > = await builder .build ({
260+ // ---cut-start---
261+ kv: undefined as unknown as KvStore ,
262+ // ---cut-end---
263+ queue: new WorkersMessageQueue (env .QUEUE_BINDING ),
264+ });
265+ for (const msg of batch .messages ) {
266+ await federation .processQueuedTask (
267+ undefined , // You need to pass your context data here
268+ msg .body as Message , // You need to cast the message body to `Message`
269+ );
270+ }
271+ }
272+ } satisfies ExportedHandler <{ QUEUE_BINDING: Queue }>;
273+ ~~~~
274+
275+ > [ !NOTE]
276+ > The [ Cloudflare Queues] API does not provide a way to poll messages from
277+ > the queue, so ` WorkersMessageQueue.listen() ` method always throws
278+ > a ` TypeError ` when invoked. Instead, you should define a ` queue() ` method
279+ > in your Cloudflare worker, which will be called by the Cloudflare Queues
280+ > API when new messages are available in the queue. Inside the ` queue() `
281+ > method, you need to call ` Federation.processQueuedTask() ` method to manually
282+ > process the messages. The ` queue() ` method is the only way to consume
283+ > messages from the queue in Cloudflare Workers.
284+
285+ [ Cloudflare Workers ] : https://workers.cloudflare.com/
286+ [ Cloudflare Queues ] : https://developers.cloudflare.com/queues/
287+
216288
217289Implementing a custom ` MessageQueue `
218290------------------------------------
0 commit comments