From 987bb4f6ed7990245ce8df676ba1bac0667b7c02 Mon Sep 17 00:00:00 2001 From: Denis Date: Mon, 20 Apr 2026 22:59:23 +0300 Subject: [PATCH] fix(turso): replace client.transaction() with UPDATE...RETURNING to prevent connection leak in queue poller MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit @libsql/client's transaction() method sets this.#db = null after BEGIN so that concurrent execute() calls use a separate connection. After commit/rollback, the detached Database object is abandoned to GC. At the default 100ms poll interval this orphans ~10 native SQLite connections per second — more than GC can reclaim — exhausting OS file-handle or native-heap limits and crashing the host process (exit code 5) after 5-10 minutes of idle operation. Replacing the three-step SELECT + UPDATE + COMMIT with a single atomic UPDATE...RETURNING via client.execute() preserves atomicity through SQLite's implicit per-statement transaction while reusing the single cached connection (execute() calls #getDb() without nulling #db). Fixes: https://github.com/mizzle-dev/workflow-worlds/issues/TBD --- packages/turso/src/queue.ts | 58 +++++++++++++------------------------ 1 file changed, 20 insertions(+), 38 deletions(-) diff --git a/packages/turso/src/queue.ts b/packages/turso/src/queue.ts index f8f2a16..39e170f 100644 --- a/packages/turso/src/queue.ts +++ b/packages/turso/src/queue.ts @@ -237,7 +237,6 @@ export function createQueue(config: QueueConfig): { /** * Poll for and process pending messages. - * Uses a write transaction to atomically claim messages and prevent race conditions. */ async function pollAndProcess(): Promise { if (!isRunning || isShuttingDown) { @@ -247,47 +246,30 @@ export function createQueue(config: QueueConfig): { try { const now = new Date().toISOString(); - // Use a write transaction to atomically select and claim a message - // This prevents race conditions where multiple pollers claim the same message - const tx = await client.transaction('write'); - let messageId: MessageId | null = null; - let queueName: ValidQueueName | null = null; - let payload: string | null = null; - let attempt = 1; - - try { - // Find a pending message that's ready to process - const result = await tx.execute({ - sql: `SELECT message_id, queue_name, payload, attempt - FROM queue_messages + // Atomically claim the oldest pending message that is ready to run. + // The subquery SELECT + outer UPDATE execute as a single implicit + // transaction in SQLite, so no two pollers can claim the same message. + const result = await client.execute({ + sql: `UPDATE queue_messages + SET status = 'processing' + WHERE message_id = ( + SELECT message_id FROM queue_messages WHERE status = 'pending' AND (not_before IS NULL OR not_before <= ?) ORDER BY created_at ASC - LIMIT 1`, - args: [now], - }); - - if (result.rows.length > 0) { - const row = result.rows[0]; - messageId = row.message_id as MessageId; - queueName = row.queue_name as ValidQueueName; - payload = row.payload as string; - attempt = (row.attempt as number) || 1; - - // Mark as processing within the same transaction - await tx.execute({ - sql: `UPDATE queue_messages SET status = 'processing' WHERE message_id = ?`, - args: [messageId], - }); - } + LIMIT 1 + ) + RETURNING message_id, queue_name, payload, attempt`, + args: [now], + }); - await tx.commit(); - } catch (err) { - await tx.rollback(); - throw err; - } + // Process outside the implicit transaction (it already committed above). + if (result.rows.length > 0) { + const row = result.rows[0]; + const messageId = row.message_id as MessageId; + const queueName = row.queue_name as ValidQueueName; + const payload = row.payload as string; + const attempt = (row.attempt as number) || 1; - // Process outside the transaction to avoid holding the lock - if (messageId && queueName && payload) { await acquireConcurrency(); processMessage(messageId, queueName, payload, attempt) .catch((err) => {