A reliable task queue worker for Supabase Edge Functions that enhances Background Tasks with powerful features.
Note
This project and all its components are licensed under Apache 2.0 license.
Edge Worker processes messages from a PostgreSQL queue and executes handler functions in Supabase Edge Functions, with built-in reliability features:
- ⚡ Reliable Processing - Automatic retries with configurable delays
- 🔄 Concurrency Control - Process multiple tasks in parallel with limits
- 🔁 Auto Restarts - Handles Edge Function CPU/memory limits gracefully
- 📈 Horizontal Scaling - Deploy multiple instances for the same queue
// Import directly from JSR in your Edge Function
import { EdgeWorker } from 'jsr:@pgflow/edge-worker';Warning
Always import from JSR.io using the jsr: prefix. Never install from npm.
For database setup, see pgflow installation docs.
You can use Edge Worker as a simple single-handler message processor.
Just pass it a handler function to .start():
import { EdgeWorker } from 'jsr:@pgflow/edge-worker';
// Start a worker that processes messages from the 'tasks' queue
EdgeWorker.start(async (payload, context) => {
console.log('Processing message:', payload);
// Access platform resources through context
const result = await context.sql`
INSERT INTO processed_tasks (data)
VALUES (${JSON.stringify(payload)})
RETURNING id
`;
return { processed: true, id: result[0].id };
});You can also use Edge Worker as a processor for Flow steps.
This will change how it polls and acknowledges messages.
Just pass it a Flow definition to .start():
import { EdgeWorker } from 'jsr:@pgflow/edge-worker';
import { Flow } from 'jsr:@pgflow/dsl/supabase';
// Define a flow using Supabase preset for Supabase resources
const AnalyzeWebsite = new Flow<{ url: string }>({
slug: 'analyzeWebsite',
})
.step({ slug: 'fetch' }, async (flowInput, context) => {
// Access Supabase resources through context
const response = await fetch(flowInput.url, {
signal: context.shutdownSignal,
});
return { html: await response.text() };
})
.step({ slug: 'save', dependsOn: ['fetch'] }, async (deps, context) => {
// Use Supabase client from context
const { data } = await context.supabase
.from('websites')
.insert({ url: context.flowInput.url, html: deps.fetch.html })
.select()
.single();
return data;
});
// Start the worker
EdgeWorker.start(AnalyzeWebsite);EdgeWorker automatically provides a context object as the second parameter to all handlers. The context contains platform resources and runtime information.
These resources are provided regardless of platform:
env- Environment variables (Record<string, string | undefined>)shutdownSignal- AbortSignal for graceful shutdown handlingrawMessage- Original pgmq message with metadatainterface PgmqMessageRecord<T> { msg_id: number; read_ct: number; enqueued_at: Date; vt: Date; message: T; }
stepTask- Current step task details (flow handlers only)interface StepTaskRecord<TFlow> { flow_slug: string; run_id: string; step_slug: string; input: StepInput<TFlow, StepSlug>; msg_id: number; }
When running on Supabase (the default), these additional resources are available:
sql- PostgreSQL client (postgres.Sql) for database queriessupabase- Supabase client (SupabaseClient) with service role key for full database access
The Supabase platform adapter requires these environment variables:
EDGE_WORKER_DB_URL- PostgreSQL connection string (automatically set by Supabase)SUPABASE_URL- Your Supabase project URL (automatically set)SUPABASE_ANON_KEY- Anonymous key (automatically set, available for autocomplete)SUPABASE_SERVICE_ROLE_KEY- Service role key (automatically set)SB_EXECUTION_ID- Execution ID for the Edge Function (automatically set)
All these variables are automatically populated by Supabase Edge Functions runtime.
Note
While SUPABASE_ANON_KEY is required in the environment, the edge worker only uses the service role key for creating the Supabase client.
// Queue handler with context
EdgeWorker.start(async (payload, context) => {
// Check environment variables
if (context.env.FEATURE_FLAG === 'enabled') {
// Use SQL client
await context.sql`UPDATE tasks SET processed = true WHERE id = ${payload.id}`;
}
// Handle graceful shutdown
if (context.shutdownSignal.aborted) {
return { status: 'aborted' };
}
// Use Supabase client
const { data } = await context.supabase
.from('results')
.insert({ task_id: payload.id })
.select();
return data;
});When defining flows that use Supabase resources, import Flow from the Supabase preset:
import { Flow } from 'jsr:@pgflow/dsl/supabase';
const MyFlow = new Flow<InputType>({ slug: 'myFlow' }).step(
{ slug: 'process' },
async (input, context) => {
// TypeScript knows context includes all Supabase resources
const users = await context.sql`SELECT * FROM users`;
return users;
}
);Note
Context is optional for backward compatibility. Handlers that don't need platform resources can omit the second parameter.
For more details on the context object and available resources, see the Context documentation.
For complete documentation, visit:
For manual end-to-end testing of edge-worker features, we maintain example edge functions in the supabase/functions directory.
- Retry Demo - Demonstrates exponential backoff retry mechanism
To run a manual test:
- Start Supabase:
pnpm nx supabase:start edge-worker - Follow the instructions in the specific test's README
This package uses jsr:@oscar6echo/postgres - a JSR-compatible fork of the popular postgres package.
Why a fork? The official npm:postgres package uses CommonJS exports (export =) which are incompatible with JSR's ES Module requirements. The npm version also fails to parse database URLs correctly in Supabase Edge Runtime, causing CONNECT_TIMEOUT errors. See porsager/postgres#839 for details.
The fork is functionally identical to postgres v3.4.5, with only the export syntax changed for Deno/JSR compatibility.
Run nx build edge-worker to build the library.