Skip to content

Commit 7fbbf37

Browse files
committed
fix(sqlite-native): surface kv errors from native sqlite
1 parent e25c1b6 commit 7fbbf37

5 files changed

Lines changed: 144 additions & 15 deletions

File tree

rivetkit-typescript/packages/rivetkit-native/index.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ export declare function startEnvoySyncJs(config: JsEnvoyConfig, eventCallback: (
5959
export declare function startEnvoyJs(config: JsEnvoyConfig, eventCallback: (event: any) => void): JsEnvoyHandle
6060
/** Native SQLite database handle exposed to JavaScript. */
6161
export declare class JsNativeDatabase {
62+
takeLastKvError(): string | null
6263
run(sql: string, params?: Array<JsBindParam> | undefined | null): Promise<ExecuteResult>
6364
query(sql: string, params?: Array<JsBindParam> | undefined | null): Promise<QueryResult>
6465
exec(sql: string): Promise<QueryResult>

rivetkit-typescript/packages/rivetkit-native/src/database.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ impl EnvoyKv {
3434

3535
#[async_trait]
3636
impl SqliteKv for EnvoyKv {
37+
fn on_error(&self, actor_id: &str, error: &SqliteKvError) {
38+
tracing::error!(%actor_id, %error, "native sqlite kv operation failed");
39+
}
40+
3741
async fn on_open(&self, _actor_id: &str) -> Result<(), SqliteKvError> {
3842
Ok(())
3943
}
@@ -116,6 +120,14 @@ impl JsNativeDatabase {
116120
.and_then(|guard| guard.as_ref().map(NativeDatabase::as_ptr))
117121
.unwrap_or(ptr::null_mut())
118122
}
123+
124+
fn take_last_kv_error_inner(&self) -> Option<String> {
125+
self
126+
.db
127+
.lock()
128+
.ok()
129+
.and_then(|guard| guard.as_ref().and_then(NativeDatabase::take_last_kv_error))
130+
}
119131
}
120132

121133
#[napi(object)]
@@ -140,6 +152,11 @@ pub struct QueryResult {
140152

141153
#[napi]
142154
impl JsNativeDatabase {
155+
#[napi]
156+
pub fn take_last_kv_error(&self) -> Option<String> {
157+
self.take_last_kv_error_inner()
158+
}
159+
143160
#[napi]
144161
pub async fn run(
145162
&self,

rivetkit-typescript/packages/rivetkit-native/wrapper.js

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,20 @@ function mapRows(rows, columns) {
264264
});
265265
}
266266

267+
async function wrapNativeStorageError(nativeDb, error) {
268+
const lastKvError =
269+
typeof nativeDb.takeLastKvError === "function"
270+
? await nativeDb.takeLastKvError()
271+
: null;
272+
if (!lastKvError) {
273+
throw error;
274+
}
275+
throw new Error(
276+
`Database query failed because the underlying storage is no longer available (${lastKvError}). This usually means the actor is stopping. Use c.abortSignal to cancel long-running work before the actor shuts down.`,
277+
{ cause: error instanceof Error ? error : undefined },
278+
);
279+
}
280+
267281
async function openRawDatabaseFromEnvoy(handle, actorId) {
268282
const nativeDb = await openDatabaseFromEnvoy(handle, actorId);
269283
let closed = false;
@@ -288,15 +302,29 @@ async function openRawDatabaseFromEnvoy(handle, actorId) {
288302
/\bRETURNING\b/i.test(query);
289303

290304
if (returnsRows) {
291-
const result = await nativeDb.query(query, bindings);
305+
let result;
306+
try {
307+
result = await nativeDb.query(query, bindings);
308+
} catch (error) {
309+
await wrapNativeStorageError(nativeDb, error);
310+
}
292311
return mapRows(result.rows, result.columns);
293312
}
294313

295-
await nativeDb.run(query, bindings);
314+
try {
315+
await nativeDb.run(query, bindings);
316+
} catch (error) {
317+
await wrapNativeStorageError(nativeDb, error);
318+
}
296319
return [];
297320
}
298321

299-
const result = await nativeDb.exec(query);
322+
let result;
323+
try {
324+
result = await nativeDb.exec(query);
325+
} catch (error) {
326+
await wrapNativeStorageError(nativeDb, error);
327+
}
300328
return mapRows(result.rows, result.columns);
301329
},
302330
close: async () => {

rivetkit-typescript/packages/sqlite-native/src/sqlite_kv.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ pub struct KvGetResult {
6868
/// at a higher level.
6969
#[async_trait]
7070
pub trait SqliteKv: Send + Sync {
71+
/// Called when a KV operation fails inside a VFS callback before the
72+
/// original error is collapsed into a generic SQLite IO error code.
73+
fn on_error(&self, _actor_id: &str, _error: &SqliteKvError) {}
74+
7175
/// Called when an actor's database is opened.
7276
async fn on_open(&self, _actor_id: &str) -> Result<(), SqliteKvError> {
7377
Ok(())

rivetkit-typescript/packages/sqlite-native/src/vfs.rs

Lines changed: 91 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ use std::ffi::{c_char, c_int, c_void, CStr, CString};
88
use std::ptr;
99
use std::slice;
1010
use std::sync::atomic::{AtomicU64, Ordering};
11-
use std::sync::{Arc, OnceLock};
11+
use std::sync::{Arc, Mutex, OnceLock};
1212

1313
use libsqlite3_sys::*;
1414
use tokio::runtime::Handle;
1515

1616
use crate::kv;
17-
use crate::sqlite_kv::{KvGetResult, SqliteKv};
17+
use crate::sqlite_kv::{KvGetResult, SqliteKv, SqliteKvError};
1818

1919
// MARK: Panic Guard
2020

@@ -158,12 +158,32 @@ struct VfsContext {
158158
actor_id: String,
159159
main_file_name: String,
160160
read_cache_enabled: bool,
161+
last_error: Mutex<Option<String>>,
161162
rt_handle: Handle,
162163
io_methods: Box<sqlite3_io_methods>,
163164
vfs_metrics: Arc<VfsMetrics>,
164165
}
165166

166167
impl VfsContext {
168+
fn clear_last_error(&self) {
169+
if let Ok(mut last_error) = self.last_error.lock() {
170+
*last_error = None;
171+
}
172+
}
173+
174+
fn set_last_error(&self, message: String) {
175+
if let Ok(mut last_error) = self.last_error.lock() {
176+
*last_error = Some(message);
177+
}
178+
}
179+
180+
fn report_kv_error(&self, err: SqliteKvError) -> String {
181+
let message = err.to_string();
182+
self.set_last_error(message.clone());
183+
self.kv.on_error(&self.actor_id, &err);
184+
message
185+
}
186+
167187
fn resolve_file_tag(&self, path: &str) -> Option<u8> {
168188
if path == self.main_file_name {
169189
return Some(kv::FILE_TAG_MAIN);
@@ -187,7 +207,10 @@ impl VfsContext {
187207
let result = self
188208
.rt_handle
189209
.block_on(self.kv.batch_get(&self.actor_id, keys))
190-
.map_err(|e| e.to_string());
210+
.map_err(|err| self.report_kv_error(err));
211+
if result.is_ok() {
212+
self.clear_last_error();
213+
}
191214
let elapsed = start.elapsed();
192215
tracing::debug!(
193216
op = %format_args!("get({key_count}keys)"),
@@ -203,7 +226,10 @@ impl VfsContext {
203226
let result = self
204227
.rt_handle
205228
.block_on(self.kv.batch_put(&self.actor_id, keys, values))
206-
.map_err(|e| e.to_string());
229+
.map_err(|err| self.report_kv_error(err));
230+
if result.is_ok() {
231+
self.clear_last_error();
232+
}
207233
let elapsed = start.elapsed();
208234
tracing::debug!(
209235
op = %format_args!("put({key_count}keys)"),
@@ -219,7 +245,10 @@ impl VfsContext {
219245
let result = self
220246
.rt_handle
221247
.block_on(self.kv.batch_delete(&self.actor_id, keys))
222-
.map_err(|e| e.to_string());
248+
.map_err(|err| self.report_kv_error(err));
249+
if result.is_ok() {
250+
self.clear_last_error();
251+
}
223252
let elapsed = start.elapsed();
224253
tracing::debug!(
225254
op = %format_args!("del({key_count}keys)"),
@@ -234,7 +263,10 @@ impl VfsContext {
234263
let result = self
235264
.rt_handle
236265
.block_on(self.kv.delete_range(&self.actor_id, start, end))
237-
.map_err(|e| e.to_string());
266+
.map_err(|err| self.report_kv_error(err));
267+
if result.is_ok() {
268+
self.clear_last_error();
269+
}
238270
let elapsed = start_time.elapsed();
239271
tracing::debug!(
240272
op = "delRange",
@@ -1164,11 +1196,29 @@ unsafe extern "C" fn kv_vfs_current_time(_p_vfs: *mut sqlite3_vfs, p_time_out: *
11641196
}
11651197

11661198
unsafe extern "C" fn kv_vfs_get_last_error(
1167-
_p_vfs: *mut sqlite3_vfs,
1168-
_n_byte: c_int,
1169-
_z_err_msg: *mut c_char,
1199+
p_vfs: *mut sqlite3_vfs,
1200+
n_byte: c_int,
1201+
z_err_msg: *mut c_char,
11701202
) -> c_int {
1171-
vfs_catch_unwind!(SQLITE_IOERR, SQLITE_OK)
1203+
vfs_catch_unwind!(SQLITE_IOERR, {
1204+
if n_byte <= 0 || z_err_msg.is_null() {
1205+
return 0;
1206+
}
1207+
1208+
let ctx = get_vfs_ctx(p_vfs);
1209+
let last_error = ctx.last_error.lock().ok().and_then(|guard| guard.clone());
1210+
let Some(message) = last_error else {
1211+
*z_err_msg = 0;
1212+
return 0;
1213+
};
1214+
1215+
let bytes = message.as_bytes();
1216+
let max_len = (n_byte as usize).saturating_sub(1);
1217+
let copy_len = bytes.len().min(max_len);
1218+
ptr::copy_nonoverlapping(bytes.as_ptr(), z_err_msg.cast::<u8>(), copy_len);
1219+
*z_err_msg.add(copy_len) = 0;
1220+
0
1221+
})
11721222
}
11731223

11741224
// MARK: KvVfs
@@ -1183,6 +1233,16 @@ unsafe impl Send for KvVfs {}
11831233
unsafe impl Sync for KvVfs {}
11841234

11851235
impl KvVfs {
1236+
fn take_last_kv_error(&self) -> Option<String> {
1237+
unsafe {
1238+
(*self.ctx_ptr)
1239+
.last_error
1240+
.lock()
1241+
.ok()
1242+
.and_then(|mut last_error| last_error.take())
1243+
}
1244+
}
1245+
11861246
pub fn register(
11871247
name: &str,
11881248
kv: Arc<dyn SqliteKv>,
@@ -1210,6 +1270,7 @@ impl KvVfs {
12101270
actor_id: actor_id.clone(),
12111271
main_file_name: actor_id,
12121272
read_cache_enabled: read_cache_enabled(),
1273+
last_error: Mutex::new(None),
12131274
rt_handle,
12141275
io_methods: Box::new(io_methods),
12151276
vfs_metrics,
@@ -1279,6 +1340,10 @@ impl NativeDatabase {
12791340
pub fn as_ptr(&self) -> *mut sqlite3 {
12801341
self.db
12811342
}
1343+
1344+
pub fn take_last_kv_error(&self) -> Option<String> {
1345+
self._vfs.take_last_kv_error()
1346+
}
12821347
}
12831348

12841349
impl Drop for NativeDatabase {
@@ -1291,6 +1356,18 @@ impl Drop for NativeDatabase {
12911356
}
12921357
}
12931358

1359+
fn sqlite_error_message(db: *mut sqlite3) -> String {
1360+
unsafe {
1361+
if db.is_null() {
1362+
"unknown sqlite error".to_string()
1363+
} else {
1364+
CStr::from_ptr(sqlite3_errmsg(db))
1365+
.to_string_lossy()
1366+
.into_owned()
1367+
}
1368+
}
1369+
}
1370+
12941371
pub fn open_database(vfs: KvVfs, file_name: &str) -> Result<NativeDatabase, String> {
12951372
let c_name = CString::new(file_name).map_err(|err| err.to_string())?;
12961373
let mut db: *mut sqlite3 = ptr::null_mut();
@@ -1304,12 +1381,13 @@ pub fn open_database(vfs: KvVfs, file_name: &str) -> Result<NativeDatabase, Stri
13041381
)
13051382
};
13061383
if rc != SQLITE_OK {
1384+
let message = sqlite_error_message(db);
13071385
if !db.is_null() {
13081386
unsafe {
13091387
sqlite3_close(db);
13101388
}
13111389
}
1312-
return Err(format!("sqlite3_open_v2 failed with code {rc}"));
1390+
return Err(format!("sqlite3_open_v2 failed with code {rc}: {message}"));
13131391
}
13141392

13151393
for pragma in &[
@@ -1324,10 +1402,11 @@ pub fn open_database(vfs: KvVfs, file_name: &str) -> Result<NativeDatabase, Stri
13241402
let rc =
13251403
unsafe { sqlite3_exec(db, c_sql.as_ptr(), None, ptr::null_mut(), ptr::null_mut()) };
13261404
if rc != SQLITE_OK {
1405+
let message = sqlite_error_message(db);
13271406
unsafe {
13281407
sqlite3_close(db);
13291408
}
1330-
return Err(format!("{pragma} failed with code {rc}"));
1409+
return Err(format!("{pragma} failed with code {rc}: {message}"));
13311410
}
13321411
}
13331412

0 commit comments

Comments
 (0)