Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 20 additions & 38 deletions packages/turso/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ export function createQueue(config: QueueConfig): {

/**
* Poll for and process pending messages.
* Uses a write transaction to atomically claim messages and prevent race conditions.
*/
async function pollAndProcess(): Promise<void> {
if (!isRunning || isShuttingDown) {
Expand All @@ -247,47 +246,30 @@ export function createQueue(config: QueueConfig): {
try {
const now = new Date().toISOString();

// Use a write transaction to atomically select and claim a message
// This prevents race conditions where multiple pollers claim the same message
const tx = await client.transaction('write');
let messageId: MessageId | null = null;
let queueName: ValidQueueName | null = null;
let payload: string | null = null;
let attempt = 1;

try {
// Find a pending message that's ready to process
const result = await tx.execute({
sql: `SELECT message_id, queue_name, payload, attempt
FROM queue_messages
// Atomically claim the oldest pending message that is ready to run.
// The subquery SELECT + outer UPDATE execute as a single implicit
// transaction in SQLite, so no two pollers can claim the same message.
const result = await client.execute({
sql: `UPDATE queue_messages
SET status = 'processing'
WHERE message_id = (
SELECT message_id FROM queue_messages
WHERE status = 'pending' AND (not_before IS NULL OR not_before <= ?)
ORDER BY created_at ASC
LIMIT 1`,
args: [now],
});

if (result.rows.length > 0) {
const row = result.rows[0];
messageId = row.message_id as MessageId;
queueName = row.queue_name as ValidQueueName;
payload = row.payload as string;
attempt = (row.attempt as number) || 1;

// Mark as processing within the same transaction
await tx.execute({
sql: `UPDATE queue_messages SET status = 'processing' WHERE message_id = ?`,
args: [messageId],
});
}
LIMIT 1
)
RETURNING message_id, queue_name, payload, attempt`,
args: [now],
});

await tx.commit();
} catch (err) {
await tx.rollback();
throw err;
}
// Process outside the implicit transaction (it already committed above).
if (result.rows.length > 0) {
const row = result.rows[0];
const messageId = row.message_id as MessageId;
const queueName = row.queue_name as ValidQueueName;
const payload = row.payload as string;
const attempt = (row.attempt as number) || 1;

// Process outside the transaction to avoid holding the lock
if (messageId && queueName && payload) {
await acquireConcurrency();
processMessage(messageId, queueName, payload, attempt)
.catch((err) => {
Expand Down