@@ -213,6 +213,88 @@ const federation = createFederation({
213213[ @fedify/amqp ] : https://github.com/fedify-dev/amqp
214214[ RabbitMQ ] : https://www.rabbitmq.com/
215215
216+ ### ` WorkersMessageQueue ` (Cloudflare Workers only)
217+
218+ * This API is available since Fedify 1.6.0.*
219+
220+ ` WorkersMessageQueue ` is a message queue implementation for [ Cloudflare Workers]
221+ that uses Cloudflare's built-in [ Cloudflare Queues] API. It provides
222+ scalability and high performance, making it suitable for production use in
223+ Cloudflare Workers environments. It requires a Cloudflare Queues setup and
224+ management.
225+
226+ Best for
227+ : Production use in Cloudflare Workers environments.
228+
229+ Pros
230+ : Persistent, reliable, scalable, easy to set up.
231+
232+ Cons
233+ : Only available in Cloudflare Workers runtime.
234+
235+ ~~~~ typescript twoslash
236+ // @noErrors: 2322 2345
237+ import type { FederationBuilder , KvStore } from " @fedify/fedify" ;
238+ const builder = undefined as unknown as FederationBuilder <void >;
239+ // ---cut-before---
240+ import type { Federation , Message } from " @fedify/fedify" ;
241+ import { WorkersMessageQueue } from " @fedify/fedify/x/cfworkers" ;
242+
243+ export default {
244+ async fetch(request , env , ctx ) {
245+ const federation: Federation <void > = await builder .build ({
246+ // ---cut-start---
247+ kv: undefined as unknown as KvStore ,
248+ // ---cut-end---
249+ queue: new WorkersMessageQueue (env .QUEUE_BINDING ),
250+ });
251+ // Omit the rest of the code for brevity
252+ },
253+
254+ // Since defining a `queue()` method is the only way to consume messages
255+ // from the queue in Cloudflare Workers, we need to define it so that
256+ // the messages can be manually processed by `Federation.processQueuedTask()`
257+ // method:
258+ async queue(batch , env , ctx ) {
259+ const federation: Federation <void > = await builder .build ({
260+ // ---cut-start---
261+ kv: undefined as unknown as KvStore ,
262+ // ---cut-end---
263+ queue: new WorkersMessageQueue (env .QUEUE_BINDING ),
264+ });
265+ for (const msg of batch .messages ) {
266+ await federation .processQueuedTask (
267+ undefined , // You need to pass your context data here
268+ msg .body as Message , // You need to cast the message body to `Message`
269+ );
270+ }
271+ }
272+ } satisfies ExportedHandler <{ QUEUE_BINDING: Queue }>;
273+ ~~~~
274+
275+ > [ !NOTE]
276+ > Since your ` Queue ` is not bound to a global variable, but rather passed as
277+ > an argument to the ` fetch() ` and ` queue() ` methods, you need to instantiate
278+ > your ` Federation ` object inside these methods, rather than at the top level.
279+ >
280+ > For better organization, you probably want to use a builder pattern to
281+ > register your dispatchers and listeners before instantiating the ` Federation `
282+ > object. See the [ * Builder pattern for structuring*
283+ > section] ( ./federation.md#builder-pattern-for-structuring ) for details.
284+
285+ > [ !NOTE]
286+ > The [ Cloudflare Queues] API does not provide a way to poll messages from
287+ > the queue, so ` WorkersMessageQueue.listen() ` method always throws
288+ > a ` TypeError ` when invoked. Instead, you should define a ` queue() ` method
289+ > in your Cloudflare worker, which will be called by the Cloudflare Queues
290+ > API when new messages are available in the queue. Inside the ` queue() `
291+ > method, you need to call ` Federation.processQueuedTask() ` method to manually
292+ > process the messages. The ` queue() ` method is the only way to consume
293+ > messages from the queue in Cloudflare Workers.
294+
295+ [ Cloudflare Workers ] : https://workers.cloudflare.com/
296+ [ Cloudflare Queues ] : https://developers.cloudflare.com/queues/
297+
216298
217299Implementing a custom ` MessageQueue `
218300------------------------------------
0 commit comments