Skip to content

Commit 4f5887c

Browse files
committed
Add new Connection API to install SQLite update hooks.
1 parent e673973 commit 4f5887c

File tree

6 files changed

+147
-5
lines changed

6 files changed

+147
-5
lines changed

libsql/src/connection.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ use crate::statement::Statement;
99
use crate::transaction::Transaction;
1010
use crate::{Result, TransactionBehavior};
1111

12+
#[derive(Clone, Copy, Debug, PartialEq)]
13+
pub enum Op {
14+
Insert = 0,
15+
Delete = 1,
16+
Update = 2,
17+
}
18+
1219
#[async_trait::async_trait]
1320
pub(crate) trait Conn {
1421
async fn execute(&self, sql: &str, params: Params) -> Result<u64>;
@@ -38,6 +45,10 @@ pub(crate) trait Conn {
3845
fn load_extension(&self, _dylib_path: &Path, _entry_point: Option<&str>) -> Result<()> {
3946
Err(crate::Error::LoadExtensionNotSupported)
4047
}
48+
49+
fn add_update_hook(&self, _cb: Box<dyn Fn(Op, &str, &str, i64) + Send + Sync>) -> Result<()> {
50+
Err(crate::Error::UpdateHookNotSupported)
51+
}
4152
}
4253

4354
/// A set of rows returned from `execute_batch`/`execute_transactional_batch`. It is essentially
@@ -244,6 +255,13 @@ impl Connection {
244255
) -> Result<()> {
245256
self.conn.load_extension(dylib_path.as_ref(), entry_point)
246257
}
258+
259+
pub fn add_update_hook(
260+
&self,
261+
cb: Box<dyn Fn(Op, &str, &str, i64) + Send + Sync>,
262+
) -> Result<()> {
263+
self.conn.add_update_hook(cb)
264+
}
247265
}
248266

249267
impl fmt::Debug for Connection {

libsql/src/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ pub enum Error {
2121
SyncNotSupported(String), // Not in rusqlite
2222
#[error("Loading extension is only supported in local databases.")]
2323
LoadExtensionNotSupported, // Not in rusqlite
24+
#[error("Update hooks are only supported in local databases.")]
25+
UpdateHookNotSupported, // Not in rusqlite
2426
#[error("Column not found: {0}")]
2527
ColumnNotFound(i32), // Not in rusqlite
2628
#[error("Hrana: `{0}`")]

libsql/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ cfg_hrana! {
175175
}
176176

177177
pub use self::{
178-
connection::{BatchRows, Connection},
178+
connection::{BatchRows, Connection, Op},
179179
database::{Builder, Database},
180180
load_extension_guard::LoadExtensionGuard,
181181
rows::{Column, Row, Rows},

libsql/src/local/connection.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
use crate::local::rows::BatchedRows;
44
use crate::params::Params;
5-
use crate::{connection::BatchRows, errors};
5+
use crate::{
6+
connection::{BatchRows, Op},
7+
errors,
8+
};
69

710
use super::{Database, Error, Result, Rows, RowsFuture, Statement, Transaction};
811

@@ -11,6 +14,10 @@ use crate::TransactionBehavior;
1114
use libsql_sys::ffi;
1215
use std::{ffi::c_int, fmt, path::Path, sync::Arc};
1316

17+
struct Container {
18+
cb: Box<dyn Fn(Op, &str, &str, i64) + Send + Sync>,
19+
}
20+
1421
/// A connection to a libSQL database.
1522
#[derive(Clone)]
1623
pub struct Connection {
@@ -384,6 +391,24 @@ impl Connection {
384391
})
385392
}
386393

394+
/// Installs update hook
395+
pub fn add_update_hook(&self, cb: Box<dyn Fn(Op, &str, &str, i64) + Send + Sync>) {
396+
let c = Box::new(Container { cb });
397+
let ptr: *mut Container = std::ptr::from_mut(Box::leak(c));
398+
399+
let old_data = unsafe {
400+
ffi::sqlite3_update_hook(
401+
self.raw,
402+
Some(update_hook_cb),
403+
ptr as *mut ::std::os::raw::c_void,
404+
)
405+
};
406+
407+
if !old_data.is_null() {
408+
let _ = unsafe { Box::from_raw(old_data as *mut Container) };
409+
}
410+
}
411+
387412
pub fn enable_load_extension(&self, onoff: bool) -> Result<()> {
388413
// SQLITE_DBCONFIG_ENABLE_LOAD_EXTENSION configration verb accepts 2 additional parameters: an on/off flag and a pointer to an c_int where new state of the parameter will be written (or NULL if reporting back the setting is not needed)
389414
// See: https://sqlite.org/c3ref/c_dbconfig_defensive.html#sqlitedbconfigenableloadextension
@@ -489,3 +514,25 @@ impl fmt::Debug for Connection {
489514
f.debug_struct("Connection").finish()
490515
}
491516
}
517+
518+
#[no_mangle]
519+
extern "C" fn update_hook_cb(
520+
data: *mut ::std::os::raw::c_void,
521+
op: ::std::os::raw::c_int,
522+
db_name: *const ::std::os::raw::c_char,
523+
table_name: *const ::std::os::raw::c_char,
524+
row_id: i64,
525+
) {
526+
let db = unsafe { std::ffi::CStr::from_ptr(db_name).to_string_lossy() };
527+
let table = unsafe { std::ffi::CStr::from_ptr(table_name).to_string_lossy() };
528+
529+
let c = unsafe { &mut *(data as *mut Container) };
530+
let o = match op {
531+
9 => Op::Delete,
532+
18 => Op::Insert,
533+
23 => Op::Update,
534+
_ => unreachable!("Unknown operation {op}"),
535+
};
536+
537+
(*c.cb)(o, &db, &table, row_id);
538+
}

libsql/src/local/impls.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use std::sync::Arc;
22
use std::{fmt, path::Path};
33

4-
use crate::connection::BatchRows;
4+
use crate::connection::{Conn, BatchRows, Op};
55
use crate::{
6-
connection::Conn,
76
params::Params,
87
rows::{ColumnsInner, RowInner, RowsInner},
98
statement::Stmt,
@@ -79,6 +78,10 @@ impl Conn for LibsqlConnection {
7978
fn load_extension(&self, dylib_path: &Path, entry_point: Option<&str>) -> Result<()> {
8079
self.conn.load_extension(dylib_path, entry_point)
8180
}
81+
82+
fn add_update_hook(&self, cb: Box<dyn Fn(Op, &str, &str, i64) + Send + Sync>) -> Result<()> {
83+
Ok(self.conn.add_update_hook(cb))
84+
}
8285
}
8386

8487
impl Drop for LibsqlConnection {

libsql/tests/integration_tests.rs

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ use futures::{StreamExt, TryStreamExt};
44
use libsql::{
55
named_params, params,
66
params::{IntoParams, IntoValue},
7-
Connection, Database, Value,
7+
Connection, Database, Op, Value,
88
};
99
use rand::distributions::Uniform;
1010
use rand::prelude::*;
1111
use std::collections::HashSet;
12+
use std::sync::{Arc, Mutex};
1213

1314
async fn setup() -> Connection {
1415
let db = Database::open(":memory:").unwrap();
@@ -27,6 +28,77 @@ async fn enable_disable_extension() {
2728
conn.load_extension_disable().unwrap();
2829
}
2930

31+
#[tokio::test]
32+
async fn add_update_hook() {
33+
let conn = setup().await;
34+
35+
#[derive(PartialEq, Debug)]
36+
struct Data {
37+
op: Op,
38+
db: String,
39+
table: String,
40+
row_id: i64,
41+
}
42+
43+
let d = Arc::new(Mutex::new(None::<Data>));
44+
45+
let d_clone = d.clone();
46+
conn.add_update_hook(Box::new(move |op, db, table, row_id| {
47+
*d_clone.lock().unwrap() = Some(Data {
48+
op,
49+
db: db.to_string(),
50+
table: table.to_string(),
51+
row_id,
52+
});
53+
}))
54+
.unwrap();
55+
56+
let _ = conn
57+
.execute("INSERT INTO users (id, name) VALUES (2, 'Alice')", ())
58+
.await
59+
.unwrap();
60+
61+
assert_eq!(
62+
*d.lock().unwrap().as_ref().unwrap(),
63+
Data {
64+
op: Op::Insert,
65+
db: "main".to_string(),
66+
table: "users".to_string(),
67+
row_id: 1,
68+
}
69+
);
70+
71+
let _ = conn
72+
.execute("UPDATE users SET name = 'Bob' WHERE id = 2", ())
73+
.await
74+
.unwrap();
75+
76+
assert_eq!(
77+
*d.lock().unwrap().as_ref().unwrap(),
78+
Data {
79+
op: Op::Update,
80+
db: "main".to_string(),
81+
table: "users".to_string(),
82+
row_id: 1,
83+
}
84+
);
85+
86+
let _ = conn
87+
.execute("DELETE FROM users WHERE id = 2", ())
88+
.await
89+
.unwrap();
90+
91+
assert_eq!(
92+
*d.lock().unwrap().as_ref().unwrap(),
93+
Data {
94+
op: Op::Delete,
95+
db: "main".to_string(),
96+
table: "users".to_string(),
97+
row_id: 1,
98+
}
99+
);
100+
}
101+
30102
#[tokio::test]
31103
async fn connection_drops_before_statements() {
32104
let db = Database::open(":memory:").unwrap();

0 commit comments

Comments
 (0)