Skip to content

Commit a3b0c3d

Browse files
committed
fix(turso): replace client.transaction() with UPDATE...RETURNING to prevent connection leak in queue poller
@libsql/client's transaction() method sets this.#db = null after BEGIN so that concurrent execute() calls use a separate connection. After commit/rollback, the detached Database object is abandoned to GC. At the default 100ms poll interval this orphans ~10 native SQLite connections per second — more than GC can reclaim — exhausting OS file-handle or native-heap limits and crashing the host process (exit code 5) after 5-10 minutes of idle operation. Replacing the three-step SELECT + UPDATE + COMMIT with a single atomic UPDATE...RETURNING via client.execute() preserves atomicity through SQLite's implicit per-statement transaction while reusing the single cached connection (execute() calls #getDb() without nulling #db). Fixes: https://github.com/mizzle-dev/workflow-worlds/issues/TBD
1 parent 4c2fed0 commit a3b0c3d

1 file changed

Lines changed: 39 additions & 38 deletions

File tree

packages/turso/src/queue.ts

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,25 @@ export function createQueue(config: QueueConfig): {
237237

238238
/**
239239
* Poll for and process pending messages.
240-
* Uses a write transaction to atomically claim messages and prevent race conditions.
240+
*
241+
* Uses a single atomic `UPDATE ... RETURNING` to claim a message instead of
242+
* the previous `client.transaction('write')` pattern.
243+
*
244+
* Why: `@libsql/client`'s `transaction()` method intentionally sets
245+
* `this.#db = null` after calling BEGIN so that concurrent `execute()` calls
246+
* on the same client use a separate connection while the transaction is open.
247+
* The consequence is that the `Database` object passed to `Sqlite3Transaction`
248+
* is decoupled from the client and only reclaimed by GC once the transaction
249+
* variable goes out of scope. At the default 100 ms poll interval this
250+
* produces ~10 orphaned native SQLite connections per second. The connections
251+
* accumulate faster than GC can reclaim them, exhausting OS file-handle or
252+
* native-heap limits and causing the host process to crash (typically exit
253+
* code 5) after 5–10 minutes of idle operation.
254+
*
255+
* `client.execute()` reuses the single cached connection via `#getDb()`
256+
* without detaching it, so no connection is orphaned. The atomicity that
257+
* the write transaction previously provided is preserved by SQLite's implicit
258+
* per-statement transaction around the `UPDATE ... RETURNING`.
241259
*/
242260
async function pollAndProcess(): Promise<void> {
243261
if (!isRunning || isShuttingDown) {
@@ -247,47 +265,30 @@ export function createQueue(config: QueueConfig): {
247265
try {
248266
const now = new Date().toISOString();
249267

250-
// Use a write transaction to atomically select and claim a message
251-
// This prevents race conditions where multiple pollers claim the same message
252-
const tx = await client.transaction('write');
253-
let messageId: MessageId | null = null;
254-
let queueName: ValidQueueName | null = null;
255-
let payload: string | null = null;
256-
let attempt = 1;
257-
258-
try {
259-
// Find a pending message that's ready to process
260-
const result = await tx.execute({
261-
sql: `SELECT message_id, queue_name, payload, attempt
262-
FROM queue_messages
268+
// Atomically claim the oldest pending message that is ready to run.
269+
// The subquery SELECT + outer UPDATE execute as a single implicit
270+
// transaction in SQLite, so no two pollers can claim the same message.
271+
const result = await client.execute({
272+
sql: `UPDATE queue_messages
273+
SET status = 'processing'
274+
WHERE message_id = (
275+
SELECT message_id FROM queue_messages
263276
WHERE status = 'pending' AND (not_before IS NULL OR not_before <= ?)
264277
ORDER BY created_at ASC
265-
LIMIT 1`,
266-
args: [now],
267-
});
268-
269-
if (result.rows.length > 0) {
270-
const row = result.rows[0];
271-
messageId = row.message_id as MessageId;
272-
queueName = row.queue_name as ValidQueueName;
273-
payload = row.payload as string;
274-
attempt = (row.attempt as number) || 1;
275-
276-
// Mark as processing within the same transaction
277-
await tx.execute({
278-
sql: `UPDATE queue_messages SET status = 'processing' WHERE message_id = ?`,
279-
args: [messageId],
280-
});
281-
}
278+
LIMIT 1
279+
)
280+
RETURNING message_id, queue_name, payload, attempt`,
281+
args: [now],
282+
});
282283

283-
await tx.commit();
284-
} catch (err) {
285-
await tx.rollback();
286-
throw err;
287-
}
284+
// Process outside the implicit transaction (it already committed above).
285+
if (result.rows.length > 0) {
286+
const row = result.rows[0];
287+
const messageId = row.message_id as MessageId;
288+
const queueName = row.queue_name as ValidQueueName;
289+
const payload = row.payload as string;
290+
const attempt = (row.attempt as number) || 1;
288291

289-
// Process outside the transaction to avoid holding the lock
290-
if (messageId && queueName && payload) {
291292
await acquireConcurrency();
292293
processMessage(messageId, queueName, payload, attempt)
293294
.catch((err) => {

0 commit comments

Comments
 (0)