Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions compat.js
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,39 @@ class Statement {
try {
const { params, queryOptions } = splitBindParameters(bindParameters);
const it = statementIterateSync(this.stmt, params, queryOptions);
let closed = false;
const close = () => {
if (closed) {
return;
}
closed = true;
if (typeof it.close === "function") {
it.close();
}
};
const next = () => {
try {
const record = iteratorNextSync(it);
if (record.done) {
close();
}
return record;
} catch (err) {
close();
throw err;
}
};
return {
next: () => iteratorNextSync(it),
[Symbol.iterator]() {
next,
return(value) {
close();
return {
next: () => iteratorNextSync(it),
}
done: true,
value,
};
},
[Symbol.iterator]() {
return this;
},
};
} catch (err) {
Expand Down
27 changes: 27 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,32 @@ export declare class Database {
* - Legacy format: `{ [tableName: string]: 0 | 1 }`
* - Full format: `{ rules: AuthRule[], defaultPolicy?: 0 | 1 | 2 }`
* - `null` to remove the authorizer
*
* Pattern fields (`table`, `column`, `entity`) accept a plain string for
* exact matching, or `{ glob: "pattern" }` for glob matching with `*` and `?`.
*
* # Examples
*
* ```javascript
* const { Authorization, Action } = require('libsql');
*
* // Legacy table-level allow/deny
* db.authorizer({ "users": Authorization.ALLOW });
*
* // Rule-based with glob patterns
* db.authorizer({
* rules: [
* { action: Action.READ, table: "users", column: "password", policy: Authorization.IGNORE },
* { action: Action.INSERT, table: { glob: "logs_*" }, policy: Authorization.ALLOW },
* { action: Action.READ, policy: Authorization.ALLOW },
* { action: Action.SELECT, policy: Authorization.ALLOW },
* ],
* defaultPolicy: Authorization.DENY,
* });
*
* // Remove authorizer
* db.authorizer(null);
* ```
*/
authorizer(config: unknown): void
/**
Expand Down Expand Up @@ -173,6 +199,7 @@ export declare class Statement {
}
/** A raw iterator over rows. The JavaScript layer wraps this in a iterable. */
export declare class RowsIterator {
close(): void
next(): Promise<Record>
}
export declare class Record {
Expand Down
22 changes: 22 additions & 0 deletions integration-tests/tests/async.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,28 @@ test.serial("Query timeout option allows short-running query", async (t) => {
db.close();
});

test.serial("Stale timeout guard from exhausted iterator does not interrupt later queries", async (t) => {
const TIMEOUT = 500;
const [db] = await connect(":memory:", { defaultQueryTimeout: TIMEOUT });
await db.exec("CREATE TABLE t(x INTEGER)");
const insert = await db.prepare("INSERT INTO t VALUES (?)");
for (let i = 0; i < 10000; i++) {
await insert.run(i);
}

// Run many sequential queries via stmt.all() (which uses iterate() internally).
// Each query finishes well under the timeout, but if the RowsIterator's
// TimeoutGuard is not released until GC, stale guards will fire and
// interrupt unrelated later queries.
const stmt = await db.prepare("SELECT * FROM t ORDER BY x ASC");
for (let i = 0; i < 100; i++) {
const rows = await stmt.all();
t.is(rows.length, 10000);
}

db.close();
});

test.serial("Per-query timeout option interrupts long-running Statement.all()", async (t) => {
const [db, errorType] = await connect(":memory:");
const stmt = await db.prepare(
Expand Down
37 changes: 31 additions & 6 deletions promise.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,16 +354,41 @@ class Statement {
try {
const { params, queryOptions } = splitBindParameters(bindParameters);
const it = await this.stmt.iterate(params, queryOptions);
let closed = false;
const close = () => {
if (closed) {
return;
}
closed = true;
if (typeof it.close === "function") {
it.close();
}
};
const next = async () => {
try {
const record = await it.next();
if (record.done) {
close();
}
return record;
} catch (err) {
close();
throw err;
}
};
return {
next() {
return it.next();
return next();
},
return(value) {
close();
return Promise.resolve({
done: true,
value,
});
},
[Symbol.asyncIterator]() {
return {
next() {
return it.next();
}
};
return this;
}
};
} catch (err) {
Expand Down
27 changes: 23 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::{
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Arc, Mutex,
},
time::Duration,
};
Expand Down Expand Up @@ -1391,7 +1391,7 @@ pub struct RowsIterator {
safe_ints: bool,
raw: bool,
pluck: bool,
_timeout_guard: Option<TimeoutGuard>,
timeout_guard: Mutex<Option<TimeoutGuard>>,
}

#[napi]
Expand All @@ -1410,14 +1410,33 @@ impl RowsIterator {
safe_ints,
raw,
pluck,
_timeout_guard: timeout_guard,
timeout_guard: Mutex::new(timeout_guard),
}
}

fn release_timeout_guard(&self) {
let mut guard = self.timeout_guard.lock().unwrap();
guard.take();
}

#[napi]
pub fn close(&self) {
self.release_timeout_guard();
}

#[napi]
pub async fn next(&self) -> Result<Record> {
let mut rows = self.rows.lock().await;
let row = rows.next().await.map_err(Error::from)?;
let row = match rows.next().await {
Ok(row) => row,
Err(err) => {
self.release_timeout_guard();
return Err(Error::from(err).into());
}
};
if row.is_none() {
self.release_timeout_guard();
}
Ok(Record {
row,
column_names: self.column_names.clone(),
Expand Down
Loading