Skip to content

Commit a1b7ef3

Browse files
authored
Apply query timeout while waiting for execution lock (#221)
The execution_lock introduced in #218 serializes operations on a connection, but the query timeout was registered only after the lock was acquired. Operations that piled up behind a busy connection could wait many seconds in the queue while the configured timeout (e.g. 500ms) never fired, because the deadline only covered actual execution time. Fix: bound the lock wait by the query timeout via tokio::time::timeout and register the time remaining against the absolute deadline with QueryTimeoutManager once the lock is acquired. If the deadline elapses while waiting, surface SQLITE_INTERRUPT — the same error a user sees when a timeout fires during execution. For prepare()'s stale-interrupt retry, the timeout guard is now dropped before clear_stale_interrupt() and a fresh guard is registered for the retry. If the deadline has already passed at re-register time, register_remaining_timeout() returns SQLITE_INTERRUPT, so a stale interrupt can no longer silently swallow our own timeout. A reproducer is included at perf/query-timeout.js: with the fix, the 100×10 concurrent batch from the bug report finishes in ~5s with the backlog cleanly timing out, vs ~200s wall time and zero interrupts before.
2 parents 9181e85 + 8f7ce0f commit a1b7ef3

3 files changed

Lines changed: 162 additions & 24 deletions

File tree

integration-tests/tests/async.test.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,36 @@ test.serial("Timeout on Statement.get() does not leak into later prepare()/EXPLA
548548
db.close();
549549
});
550550

551+
test.serial("Query timeout covers wait time on the execution lock", async (t) => {
552+
t.timeout(15_000);
553+
const [db, errorType] = await connect(":memory:", { defaultQueryTimeout: 200 });
554+
await db.exec("CREATE TABLE t(x INTEGER)");
555+
const insert = await db.prepare("INSERT INTO t VALUES (?)");
556+
for (let i = 0; i < 5000; i++) {
557+
await insert.run(i);
558+
}
559+
const stmt = await db.prepare("SELECT * FROM t ORDER BY x ASC");
560+
561+
let interrupts = 0;
562+
const promises = [];
563+
for (let i = 0; i < 50; i++) {
564+
promises.push(
565+
stmt.all().catch((err) => {
566+
if (err.code === "SQLITE_INTERRUPT") {
567+
interrupts++;
568+
} else {
569+
throw err;
570+
}
571+
})
572+
);
573+
}
574+
await Promise.all(promises);
575+
576+
t.true(interrupts > 0, `expected some queries to timeout, got ${interrupts}`);
577+
578+
db.close();
579+
});
580+
551581
test.serial("Per-query timeout option is accepted by Database.exec()", async (t) => {
552582
const [db] = await connect(":memory:");
553583
await db.exec("SELECT 1", { queryTimeout: 100 });

perf/query-timeout.js

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { connect } from "libsql/promise";
2+
3+
const TIMEOUT = 500;
4+
const db = await connect(":memory:", {
5+
defaultQueryTimeout: TIMEOUT,
6+
});
7+
8+
await db.exec("CREATE TABLE t(x INTEGER)");
9+
const insert = await db.prepare("INSERT INTO t values (?)");
10+
for (let i = 0; i < 10000; i++) {
11+
await insert.run(i);
12+
}
13+
14+
let okCount = 0;
15+
let errCount = 0;
16+
let maxDuration = 0;
17+
18+
async function query() {
19+
const stmt = await db.prepare("SELECT * FROM t ORDER BY x ASC");
20+
const res = await stmt.all();
21+
return res;
22+
}
23+
24+
async function batch(n) {
25+
for (let i = 0; i < n; i++) {
26+
const start = performance.now();
27+
await query()
28+
.then(() => {
29+
const duration = performance.now() - start;
30+
if (duration > maxDuration) maxDuration = duration;
31+
okCount++;
32+
})
33+
.catch((e) => {
34+
const duration = performance.now() - start;
35+
if (duration > maxDuration) maxDuration = duration;
36+
errCount++;
37+
if (errCount <= 5) console.error("err:", e.code || e.name, e.message, duration.toFixed(1));
38+
});
39+
}
40+
}
41+
42+
const startAll = performance.now();
43+
const batches = [];
44+
for (let i = 0; i < 100; i++) {
45+
batches.push(batch(10));
46+
}
47+
await Promise.all(batches);
48+
const total = performance.now() - startAll;
49+
console.log(`total wall: ${total.toFixed(1)}ms; ok=${okCount}, err=${errCount}, maxDur=${maxDuration.toFixed(1)}ms`);

src/lib.rs

Lines changed: 83 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,13 @@ use napi_derive::napi;
3131
use once_cell::sync::OnceCell;
3232
use query_timeout::{QueryTimeoutGuard, QueryTimeoutManager};
3333

34-
/// Registers a query timeout against the process-wide timer wheel, if one was
35-
/// requested. Returns a guard that cancels the timeout when dropped.
36-
fn register_query_timeout(
37-
conn: &Arc<libsql::Connection>,
38-
query_timeout: Option<Duration>,
39-
) -> Option<QueryTimeoutGuard> {
40-
query_timeout.map(|t| QueryTimeoutManager::global().register(conn, t))
41-
}
4234
use std::{
4335
str::FromStr,
4436
sync::{
4537
atomic::{AtomicBool, Ordering},
4638
Arc, Mutex,
4739
},
48-
time::Duration,
40+
time::{Duration, Instant},
4941
};
5042
use tokio::runtime::Runtime;
5143
use tracing_subscriber::{filter::LevelFilter, EnvFilter};
@@ -412,11 +404,17 @@ impl Database {
412404
));
413405
}
414406
};
415-
let _execution_guard = self.execution_lock.clone().lock_owned().await;
407+
let (_execution_guard, deadline) =
408+
acquire_execution_lock(&self.execution_lock, self.query_timeout).await?;
409+
let timeout_guard = register_remaining_timeout(&conn, deadline)?;
416410
let stmt = match conn.prepare(&sql).await {
417411
Ok(stmt) => stmt,
418412
Err(err) if is_sqlite_interrupt(&err) => {
413+
// Drop our guard before clear_stale_interrupt so the bg thread
414+
// can't fire conn.interrupt() for our id mid-probe.
415+
drop(timeout_guard);
419416
clear_stale_interrupt(&conn).await;
417+
let _retry_guard = register_remaining_timeout(&conn, deadline)?;
420418
conn.prepare(&sql).await.map_err(Error::from)?
421419
}
422420
Err(err) => return Err(Error::from(err).into()),
@@ -575,8 +573,9 @@ impl Database {
575573
Some(timeout_ms) => query_timeout_duration(timeout_ms),
576574
None => self.query_timeout,
577575
};
578-
let _execution_guard = self.execution_lock.clone().lock_owned().await;
579-
let _timeout_guard = register_query_timeout(&conn, query_timeout);
576+
let (_execution_guard, deadline) =
577+
acquire_execution_lock(&self.execution_lock, query_timeout).await?;
578+
let _timeout_guard = register_remaining_timeout(&conn, deadline)?;
580579
conn.execute_batch(&sql).await.map_err(Error::from)?;
581580
Ok(())
582581
}
@@ -866,6 +865,60 @@ fn is_sqlite_interrupt(err: &libsql::Error) -> bool {
866865
)
867866
}
868867

868+
fn query_timeout_error() -> napi::Error {
869+
throw_sqlite_error(
870+
"interrupted".to_string(),
871+
"SQLITE_INTERRUPT".to_string(),
872+
libsql::ffi::SQLITE_INTERRUPT,
873+
)
874+
}
875+
876+
fn timeout_deadline(timeout: Duration) -> Instant {
877+
let now = Instant::now();
878+
now.checked_add(timeout)
879+
.unwrap_or_else(|| now + Duration::from_secs(86400))
880+
}
881+
882+
fn register_remaining_timeout(
883+
conn: &Arc<libsql::Connection>,
884+
deadline: Option<Instant>,
885+
) -> Result<Option<QueryTimeoutGuard>> {
886+
match deadline {
887+
Some(deadline) => match deadline.checked_duration_since(Instant::now()) {
888+
Some(remaining) if remaining > Duration::ZERO => {
889+
Ok(Some(QueryTimeoutManager::global().register(conn, remaining)))
890+
}
891+
_ => Err(query_timeout_error()),
892+
},
893+
None => Ok(None),
894+
}
895+
}
896+
897+
/// Acquire the per-connection execution lock, enforcing the query timeout
898+
/// across the queue-wait. Returns the lock guard and the absolute deadline
899+
/// for the current operation. If the timeout elapses while waiting, returns
900+
/// a SQLITE_INTERRUPT error so the caller sees the same behaviour as when a
901+
/// timeout fires during execution.
902+
async fn acquire_execution_lock(
903+
lock: &Arc<tokio::sync::Mutex<()>>,
904+
timeout: Option<Duration>,
905+
) -> Result<(tokio::sync::OwnedMutexGuard<()>, Option<Instant>)> {
906+
let lock = lock.clone();
907+
match timeout {
908+
None => Ok((lock.lock_owned().await, None)),
909+
Some(t) => {
910+
let deadline = timeout_deadline(t);
911+
match tokio::time::timeout(t, lock.lock_owned()).await {
912+
Ok(guard) => match deadline.checked_duration_since(Instant::now()) {
913+
Some(remaining) if remaining > Duration::ZERO => Ok((guard, Some(deadline))),
914+
_ => Err(query_timeout_error()),
915+
},
916+
Err(_) => Err(query_timeout_error()),
917+
}
918+
}
919+
}
920+
}
921+
869922
async fn clear_stale_interrupt(conn: &Arc<libsql::Connection>) {
870923
// If a timeout interrupt races with operation completion, the next operation
871924
// can observe a stale SQLITE_INTERRUPT. Probe the connection to consume it.
@@ -949,8 +1002,9 @@ impl Statement {
9491002
let execution_lock = self.execution_lock.clone();
9501003

9511004
let future = async move {
952-
let _execution_guard = execution_lock.lock_owned().await;
953-
let _timeout_guard = register_query_timeout(&conn, query_timeout);
1005+
let (_execution_guard, deadline) =
1006+
acquire_execution_lock(&execution_lock, query_timeout).await?;
1007+
let _timeout_guard = register_remaining_timeout(&conn, deadline)?;
9541008
stmt.run(params).await.map_err(Error::from)?;
9551009
let changes = if conn.total_changes() == total_changes_before {
9561010
0
@@ -1002,9 +1056,10 @@ impl Statement {
10021056
let query_timeout = self.resolve_query_timeout(query_options);
10031057
let execution_lock = self.execution_lock.clone();
10041058
let future = async move {
1059+
let (_execution_guard, deadline) =
1060+
acquire_execution_lock(&execution_lock, query_timeout).await?;
10051061
let result: std::result::Result<(Option<libsql::Row>, Option<f64>), Error> = {
1006-
let _execution_guard = execution_lock.lock_owned().await;
1007-
let _timeout_guard = register_query_timeout(&conn, query_timeout);
1062+
let _timeout_guard = register_remaining_timeout(&conn, deadline)?;
10081063
async {
10091064
let mut rows = stmt_fut.query(params).await.map_err(Error::from)?;
10101065
let row = rows.next().await.map_err(Error::from)?;
@@ -1086,8 +1141,9 @@ impl Statement {
10861141
let query_timeout = self.resolve_query_timeout(query_options);
10871142
let execution_lock = self.execution_lock.clone();
10881143
let future = async move {
1089-
let execution_guard = execution_lock.lock_owned().await;
1090-
let timeout_guard = register_query_timeout(&conn, query_timeout);
1144+
let (execution_guard, deadline) =
1145+
acquire_execution_lock(&execution_lock, query_timeout).await?;
1146+
let timeout_guard = register_remaining_timeout(&conn, deadline)?;
10911147
let rows = stmt_for_query.query(params).await.map_err(Error::from)?;
10921148
Ok::<_, napi::Error>((rows, execution_guard, timeout_guard))
10931149
};
@@ -1221,8 +1277,9 @@ pub fn statement_get_sync(
12211277
let execution_lock = stmt.execution_lock.clone();
12221278
let result: Result<(Option<libsql::Row>, Option<f64>)> = {
12231279
rt.block_on(async move {
1224-
let _execution_guard = execution_lock.lock_owned().await;
1225-
let _timeout_guard = register_query_timeout(&stmt.conn, query_timeout);
1280+
let (_execution_guard, deadline) =
1281+
acquire_execution_lock(&execution_lock, query_timeout).await?;
1282+
let _timeout_guard = register_remaining_timeout(&stmt.conn, deadline)?;
12261283
let params = map_params(&stmt.stmt, params)?;
12271284
let mut rows = stmt.stmt.query(params).await.map_err(Error::from)?;
12281285
let row = rows.next().await.map_err(Error::from)?;
@@ -1263,8 +1320,9 @@ pub fn statement_run_sync(
12631320
let query_timeout = stmt.resolve_query_timeout(query_options);
12641321
let execution_lock = stmt.execution_lock.clone();
12651322
rt.block_on(async move {
1266-
let _execution_guard = execution_lock.lock_owned().await;
1267-
let _timeout_guard = register_query_timeout(&stmt.conn, query_timeout);
1323+
let (_execution_guard, deadline) =
1324+
acquire_execution_lock(&execution_lock, query_timeout).await?;
1325+
let _timeout_guard = register_remaining_timeout(&stmt.conn, deadline)?;
12681326
let params = map_params(&stmt.stmt, params)?;
12691327
let total_changes_before = stmt.conn.total_changes();
12701328
let start = std::time::Instant::now();
@@ -1302,8 +1360,9 @@ pub fn statement_iterate_sync(
13021360
let inner_stmt = stmt.stmt.clone();
13031361
let iter_stmt = inner_stmt.clone();
13041362
let (rows, column_names, execution_guard, timeout_guard) = rt.block_on(async move {
1305-
let execution_guard = execution_lock.lock_owned().await;
1306-
let timeout_guard = register_query_timeout(&conn, query_timeout);
1363+
let (execution_guard, deadline) =
1364+
acquire_execution_lock(&execution_lock, query_timeout).await?;
1365+
let timeout_guard = register_remaining_timeout(&conn, deadline)?;
13071366
inner_stmt.reset();
13081367
let params = map_params(&inner_stmt, params)?;
13091368
let rows = inner_stmt.query(params).await.map_err(Error::from)?;

0 commit comments

Comments
 (0)