Skip to content

Commit b6e1f9b

Browse files
committed
Replace per-connection execution lock with statement-level interrupts
Query timeouts previously relied on a per-connection execution lock that serialized every operation so the timer wheel's connection.interrupt() would unambiguously hit the timed-out operation. This serialized all concurrent work on a connection and added a mutex acquisition to every call. Use libsql_stmt_interrupt() instead: the timer wheel now interrupts the specific statement that timed out (via a new Interruptible trait, so the wheel can target either a statement or a connection), leaving other concurrent operations on the same connection untouched. This removes execution_lock entirely, along with acquire_execution_lock and the deadline/remaining plumbing it required. Requires libsql 0.10.0-pre.4, which honors libsql_stmt_interrupt per VDBE instruction (pre.3 only checked at step entry and could not abort a long single step such as an aggregate). The per-statement interrupt flag is sticky: sqlite3_step checks it at entry and only reset() clears it, unlike the connection flag SQLite auto-clears each step. We do not reset before every step (that would add overhead to every call); each operation recovers at its existing natural reset point instead - run/iterate reset before stepping, get/get_sync reset after (self-healing). Add tests: statement-level interrupt of a multi-row query and a single-step aggregate, the sticky-flag-needs-reset contract, and a JS test for statement recovery after an interrupt.
1 parent d5691a8 commit b6e1f9b

4 files changed

Lines changed: 159 additions & 137 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ crate-type = ["cdylib"]
1111

1212
[dependencies]
1313
glob-match = "0.2"
14-
libsql = { version = "0.10.0-pre.3", features = ["encryption"] }
14+
libsql = { version = "0.10.0-pre.4", features = ["encryption"] }
1515
napi = { version = "2", default-features = false, features = ["napi6", "tokio_rt", "async"] }
1616
napi-derive = "2"
1717
once_cell = "1.18.0"

integration-tests/tests/async.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ test.serial("Timeout on Statement.get() does not leak into later prepare()/EXPLA
548548
db.close();
549549
});
550550

551-
test.serial("Query timeout covers wait time on the execution lock", async (t) => {
551+
test.serial("Query timeout interrupts long-running queries under concurrency", async (t) => {
552552
t.timeout(15_000);
553553
const [db, errorType] = await connect(":memory:", { defaultQueryTimeout: 200 });
554554
await db.exec("CREATE TABLE t(x INTEGER)");

src/lib.rs

Lines changed: 35 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use std::{
3737
atomic::{AtomicBool, Ordering},
3838
Arc, Mutex,
3939
},
40-
time::{Duration, Instant},
40+
time::Duration,
4141
};
4242
use tokio::runtime::Runtime;
4343
use tracing_subscriber::{filter::LevelFilter, EnvFilter};
@@ -238,8 +238,6 @@ pub struct Database {
238238
memory: bool,
239239
// Maximum time in milliseconds that a query is allowed to run.
240240
query_timeout: Option<Duration>,
241-
// Ensures only one operation executes per connection at a time.
242-
execution_lock: Arc<tokio::sync::Mutex<()>>,
243241
}
244242

245243
impl Drop for Database {
@@ -341,14 +339,12 @@ pub async fn connect(path: String, opts: Option<Options>) -> Result<Database> {
341339
.as_ref()
342340
.and_then(|o| o.defaultQueryTimeout)
343341
.and_then(query_timeout_duration);
344-
let execution_lock = Arc::new(tokio::sync::Mutex::new(()));
345342
Ok(Database {
346343
db: Some(db),
347344
conn: Some(conn),
348345
default_safe_integers,
349346
memory,
350347
query_timeout,
351-
execution_lock,
352348
})
353349
}
354350

@@ -404,17 +400,15 @@ impl Database {
404400
));
405401
}
406402
};
407-
let (_execution_guard, deadline) =
408-
acquire_execution_lock(&self.execution_lock, self.query_timeout).await?;
409-
let timeout_guard = register_remaining_timeout(&conn, deadline)?;
403+
let timeout_guard = register_timeout(&conn, self.query_timeout);
410404
let stmt = match conn.prepare(&sql).await {
411405
Ok(stmt) => stmt,
412406
Err(err) if is_sqlite_interrupt(&err) => {
413407
// Drop our guard before clear_stale_interrupt so the bg thread
414408
// can't fire conn.interrupt() for our id mid-probe.
415409
drop(timeout_guard);
416410
clear_stale_interrupt(&conn).await;
417-
let _retry_guard = register_remaining_timeout(&conn, deadline)?;
411+
let _retry_guard = register_timeout(&conn, self.query_timeout);
418412
conn.prepare(&sql).await.map_err(Error::from)?
419413
}
420414
Err(err) => return Err(Error::from(err).into()),
@@ -425,13 +419,7 @@ impl Database {
425419
pluck: false.into(),
426420
timing: false.into(),
427421
};
428-
Ok(Statement::new(
429-
conn,
430-
stmt,
431-
mode,
432-
self.query_timeout,
433-
self.execution_lock.clone(),
434-
))
422+
Ok(Statement::new(conn, stmt, mode, self.query_timeout))
435423
}
436424

437425
/// Sets the authorizer for the database.
@@ -573,9 +561,7 @@ impl Database {
573561
Some(timeout_ms) => query_timeout_duration(timeout_ms),
574562
None => self.query_timeout,
575563
};
576-
let (_execution_guard, deadline) =
577-
acquire_execution_lock(&self.execution_lock, query_timeout).await?;
578-
let _timeout_guard = register_remaining_timeout(&conn, deadline)?;
564+
let _timeout_guard = register_timeout(&conn, query_timeout);
579565
conn.execute_batch(&sql).await.map_err(Error::from)?;
580566
Ok(())
581567
}
@@ -865,58 +851,18 @@ fn is_sqlite_interrupt(err: &libsql::Error) -> bool {
865851
)
866852
}
867853

868-
fn query_timeout_error() -> napi::Error {
869-
throw_sqlite_error(
870-
"interrupted".to_string(),
871-
"SQLITE_INTERRUPT".to_string(),
872-
libsql::ffi::SQLITE_INTERRUPT,
873-
)
874-
}
875-
876-
fn timeout_deadline(timeout: Duration) -> Instant {
877-
let now = Instant::now();
878-
now.checked_add(timeout)
879-
.unwrap_or_else(|| now + Duration::from_secs(86400))
880-
}
881-
882-
fn register_remaining_timeout(
883-
conn: &Arc<libsql::Connection>,
884-
deadline: Option<Instant>,
885-
) -> Result<Option<QueryTimeoutGuard>> {
886-
match deadline {
887-
Some(deadline) => match deadline.checked_duration_since(Instant::now()) {
888-
Some(remaining) if remaining > Duration::ZERO => {
889-
Ok(Some(QueryTimeoutManager::global().register(conn, remaining)))
890-
}
891-
_ => Err(query_timeout_error()),
892-
},
893-
None => Ok(None),
894-
}
895-
}
896854

897-
/// Acquire the per-connection execution lock, enforcing the query timeout
898-
/// across the queue-wait. Returns the lock guard and the absolute deadline
899-
/// for the current operation. If the timeout elapses while waiting, returns
900-
/// a SQLITE_INTERRUPT error so the caller sees the same behaviour as when a
901-
/// timeout fires during execution.
902-
async fn acquire_execution_lock(
903-
lock: &Arc<tokio::sync::Mutex<()>>,
855+
/// Register a query timeout against `target` for the operation about to run,
856+
/// returning a guard that cancels the timeout when dropped. The target is the
857+
/// statement for per-statement operations (so the timer interrupts only that
858+
/// statement, leaving concurrent operations on the same connection untouched)
859+
/// or the connection for connection-wide operations. Returns `None` when no
860+
/// timeout is in effect.
861+
fn register_timeout<T: query_timeout::Interruptible + 'static>(
862+
target: &Arc<T>,
904863
timeout: Option<Duration>,
905-
) -> Result<(tokio::sync::OwnedMutexGuard<()>, Option<Instant>)> {
906-
let lock = lock.clone();
907-
match timeout {
908-
None => Ok((lock.lock_owned().await, None)),
909-
Some(t) => {
910-
let deadline = timeout_deadline(t);
911-
match tokio::time::timeout(t, lock.lock_owned()).await {
912-
Ok(guard) => match deadline.checked_duration_since(Instant::now()) {
913-
Some(remaining) if remaining > Duration::ZERO => Ok((guard, Some(deadline))),
914-
_ => Err(query_timeout_error()),
915-
},
916-
Err(_) => Err(query_timeout_error()),
917-
}
918-
}
919-
}
864+
) -> Option<QueryTimeoutGuard> {
865+
timeout.map(|t| QueryTimeoutManager::global().register(target, t))
920866
}
921867

922868
async fn clear_stale_interrupt(conn: &Arc<libsql::Connection>) {
@@ -944,8 +890,6 @@ pub struct Statement {
944890
mode: AccessMode,
945891
// Maximum time in milliseconds that a query is allowed to run.
946892
query_timeout: Option<Duration>,
947-
// Shared per-connection execution lock.
948-
execution_lock: Arc<tokio::sync::Mutex<()>>,
949893
}
950894

951895
#[napi]
@@ -962,7 +906,6 @@ impl Statement {
962906
stmt: libsql::Statement,
963907
mode: AccessMode,
964908
query_timeout: Option<Duration>,
965-
execution_lock: Arc<tokio::sync::Mutex<()>>,
966909
) -> Self {
967910
let column_names: Vec<std::ffi::CString> = stmt
968911
.columns()
@@ -976,7 +919,6 @@ impl Statement {
976919
column_names,
977920
mode,
978921
query_timeout,
979-
execution_lock,
980922
}
981923
}
982924

@@ -999,12 +941,9 @@ impl Statement {
999941
let stmt = self.stmt.clone();
1000942
let conn = self.conn.clone();
1001943
let query_timeout = self.resolve_query_timeout(query_options);
1002-
let execution_lock = self.execution_lock.clone();
1003944

1004945
let future = async move {
1005-
let (_execution_guard, deadline) =
1006-
acquire_execution_lock(&execution_lock, query_timeout).await?;
1007-
let _timeout_guard = register_remaining_timeout(&conn, deadline)?;
946+
let _timeout_guard = register_timeout(&stmt, query_timeout);
1008947
stmt.run(params).await.map_err(Error::from)?;
1009948
let changes = if conn.total_changes() == total_changes_before {
1010949
0
@@ -1052,14 +991,10 @@ impl Statement {
1052991

1053992
let stmt = self.stmt.clone();
1054993
let stmt_fut = stmt.clone();
1055-
let conn = self.conn.clone();
1056994
let query_timeout = self.resolve_query_timeout(query_options);
1057-
let execution_lock = self.execution_lock.clone();
1058995
let future = async move {
1059-
let (_execution_guard, deadline) =
1060-
acquire_execution_lock(&execution_lock, query_timeout).await?;
1061996
let result: std::result::Result<(Option<libsql::Row>, Option<f64>), Error> = {
1062-
let _timeout_guard = register_remaining_timeout(&conn, deadline)?;
997+
let _timeout_guard = register_timeout(&stmt_fut, query_timeout);
1063998
async {
1064999
let mut rows = stmt_fut.query(params).await.map_err(Error::from)?;
10651000
let row = rows.next().await.map_err(Error::from)?;
@@ -1137,32 +1072,24 @@ impl Statement {
11371072
let params = map_params(&stmt, params)?;
11381073
let stmt_for_query = self.stmt.clone();
11391074
let stmt_for_iter = stmt_for_query.clone();
1140-
let conn = self.conn.clone();
11411075
let query_timeout = self.resolve_query_timeout(query_options);
1142-
let execution_lock = self.execution_lock.clone();
11431076
let future = async move {
1144-
let (execution_guard, deadline) =
1145-
acquire_execution_lock(&execution_lock, query_timeout).await?;
1146-
let timeout_guard = register_remaining_timeout(&conn, deadline)?;
1077+
let timeout_guard = register_timeout(&stmt_for_query, query_timeout);
11471078
let rows = stmt_for_query.query(params).await.map_err(Error::from)?;
1148-
Ok::<_, napi::Error>((rows, execution_guard, timeout_guard))
1079+
Ok::<_, napi::Error>((rows, timeout_guard))
11491080
};
11501081
let column_names = self.column_names.clone();
1151-
env.execute_tokio_future(
1152-
future,
1153-
move |&mut _env, (result, execution_guard, timeout_guard)| {
1154-
Ok(RowsIterator::new(
1155-
Arc::new(tokio::sync::Mutex::new(result)),
1156-
stmt_for_iter,
1157-
column_names,
1158-
safe_ints,
1159-
raw,
1160-
pluck,
1161-
timeout_guard,
1162-
execution_guard,
1163-
))
1164-
},
1165-
)
1082+
env.execute_tokio_future(future, move |&mut _env, (result, timeout_guard)| {
1083+
Ok(RowsIterator::new(
1084+
Arc::new(tokio::sync::Mutex::new(result)),
1085+
stmt_for_iter,
1086+
column_names,
1087+
safe_ints,
1088+
raw,
1089+
pluck,
1090+
timeout_guard,
1091+
))
1092+
})
11661093
}
11671094

11681095
#[napi]
@@ -1274,12 +1201,9 @@ pub fn statement_get_sync(
12741201

12751202
let rt = runtime()?;
12761203
let query_timeout = stmt.resolve_query_timeout(query_options);
1277-
let execution_lock = stmt.execution_lock.clone();
12781204
let result: Result<(Option<libsql::Row>, Option<f64>)> = {
12791205
rt.block_on(async move {
1280-
let (_execution_guard, deadline) =
1281-
acquire_execution_lock(&execution_lock, query_timeout).await?;
1282-
let _timeout_guard = register_remaining_timeout(&stmt.conn, deadline)?;
1206+
let _timeout_guard = register_timeout(&stmt.stmt, query_timeout);
12831207
let params = map_params(&stmt.stmt, params)?;
12841208
let mut rows = stmt.stmt.query(params).await.map_err(Error::from)?;
12851209
let row = rows.next().await.map_err(Error::from)?;
@@ -1318,11 +1242,8 @@ pub fn statement_run_sync(
13181242
stmt.stmt.reset();
13191243
let rt = runtime()?;
13201244
let query_timeout = stmt.resolve_query_timeout(query_options);
1321-
let execution_lock = stmt.execution_lock.clone();
13221245
rt.block_on(async move {
1323-
let (_execution_guard, deadline) =
1324-
acquire_execution_lock(&execution_lock, query_timeout).await?;
1325-
let _timeout_guard = register_remaining_timeout(&stmt.conn, deadline)?;
1246+
let _timeout_guard = register_timeout(&stmt.stmt, query_timeout);
13261247
let params = map_params(&stmt.stmt, params)?;
13271248
let total_changes_before = stmt.conn.total_changes();
13281249
let start = std::time::Instant::now();
@@ -1355,14 +1276,10 @@ pub fn statement_iterate_sync(
13551276
let raw = stmt.mode.raw.load(Ordering::SeqCst);
13561277
let pluck = stmt.mode.pluck.load(Ordering::SeqCst);
13571278
let query_timeout = stmt.resolve_query_timeout(query_options);
1358-
let conn = stmt.conn.clone();
1359-
let execution_lock = stmt.execution_lock.clone();
13601279
let inner_stmt = stmt.stmt.clone();
13611280
let iter_stmt = inner_stmt.clone();
1362-
let (rows, column_names, execution_guard, timeout_guard) = rt.block_on(async move {
1363-
let (execution_guard, deadline) =
1364-
acquire_execution_lock(&execution_lock, query_timeout).await?;
1365-
let timeout_guard = register_remaining_timeout(&conn, deadline)?;
1281+
let (rows, column_names, timeout_guard) = rt.block_on(async move {
1282+
let timeout_guard = register_timeout(&inner_stmt, query_timeout);
13661283
inner_stmt.reset();
13671284
let params = map_params(&inner_stmt, params)?;
13681285
let rows = inner_stmt.query(params).await.map_err(Error::from)?;
@@ -1371,7 +1288,7 @@ pub fn statement_iterate_sync(
13711288
column_names
13721289
.push(std::ffi::CString::new(rows.column_name(i).unwrap().to_string()).unwrap());
13731290
}
1374-
Ok::<_, napi::Error>((rows, column_names, execution_guard, timeout_guard))
1291+
Ok::<_, napi::Error>((rows, column_names, timeout_guard))
13751292
})?;
13761293
Ok(RowsIterator::new(
13771294
Arc::new(tokio::sync::Mutex::new(rows)),
@@ -1381,7 +1298,6 @@ pub fn statement_iterate_sync(
13811298
raw,
13821299
pluck,
13831300
timeout_guard,
1384-
execution_guard,
13851301
))
13861302
}
13871303

@@ -1530,7 +1446,6 @@ pub struct RowsIterator {
15301446
raw: bool,
15311447
pluck: bool,
15321448
timeout_guard: Mutex<Option<QueryTimeoutGuard>>,
1533-
execution_guard: Mutex<Option<tokio::sync::OwnedMutexGuard<()>>>,
15341449
}
15351450

15361451
#[napi]
@@ -1543,7 +1458,6 @@ impl RowsIterator {
15431458
raw: bool,
15441459
pluck: bool,
15451460
timeout_guard: Option<QueryTimeoutGuard>,
1546-
execution_guard: tokio::sync::OwnedMutexGuard<()>,
15471461
) -> Self {
15481462
Self {
15491463
rows,
@@ -1553,7 +1467,6 @@ impl RowsIterator {
15531467
raw,
15541468
pluck,
15551469
timeout_guard: Mutex::new(timeout_guard),
1556-
execution_guard: Mutex::new(Some(execution_guard)),
15571470
}
15581471
}
15591472

@@ -1588,8 +1501,6 @@ impl RowsIterator {
15881501
self.stmt.reset();
15891502
let mut timeout_guard = self.timeout_guard.lock().unwrap();
15901503
timeout_guard.take();
1591-
let mut execution_guard = self.execution_guard.lock().unwrap();
1592-
execution_guard.take();
15931504
}
15941505
}
15951506

0 commit comments

Comments
 (0)