Skip to content

Commit 105045c

Browse files
committed
Update async-sqlite
1 parent e7d57af commit 105045c

5 files changed

Lines changed: 90 additions & 207 deletions

File tree

examples/async-sqlite/README.md

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ The Async SQLite example implements a simple database in Rust with a JavaScript
55
## Usage
66

77
```js
8-
const Database = require(".");
8+
const { Database } = require(".");
99

1010
(async () => {
1111
const db = new Database();
@@ -31,25 +31,18 @@ Since SQLite is naturally single threaded, our application does not benefit from
3131

3232
Once the database thread is spawned, the JavaScript main thread needs a way to communicate with it. A [multi-producer, single-consumer (mpsc)][mpsc] channel is created. The receiving end is owned by the database thread and the sender is held by JavaScript.
3333

34-
#### `JsBox`
34+
#### `#[neon::export(class)]`
3535

36-
Rust data cannot be directly held by JavaScript. The [`JsBox`][jsbox] provides a mechanism for allowing JavaScript to hold a reference to Rust data and later access it again from Rust.
36+
The Rust sender side of the channel is held in a JavaScript [class][class]. It can be referenced later in methods.
3737

3838
#### Rust and Neon Channels
3939

4040
The mpsc channel provides a way for the JavaScript main thread to communicate with the database thread, but it is one-way. In order to complete the callback, the database thread must be able to communicate with the JavaScript main thread. [`neon::event::Channel`][channel] provides a channel for sending these events back.
4141

42-
#### `Root`
43-
44-
The last issue to solve is sending a reference to the JavaScript callback to the database thread and back again before finally calling it. [Handles][handle] to JavaScript values are not `Send`; they cannot escape the scope that created them. The reason they cannot be passed to other threads is because when control is returned back to the JavaScript engine, the garbage collector may determine they are no longer used and free the value.
45-
46-
A [`Root`][root] is a special handle to a JavaScript value that prevents the value from being freed as long as the `Root` has not been dropped. By placing the callback in a `Root`, it can be safely sent across threads and finally accessed and called when back on the JavaScript main thread.
47-
4842
### JavaScript
4943

5044
[thread]: https://doc.rust-lang.org/std/thread/
5145
[mpsc]: https://doc.rust-lang.org/std/sync/mpsc/index.html
52-
[jsbox]: https://docs.rs/neon/latest/neon/types/struct.JsBox.html
46+
[class]: https://docs.rs/neon/latest/neon/attr.class.html
5347
[channel]: https://docs.rs/neon/latest/neon/event/struct.Channel.html
5448
[handle]: https://docs.rs/neon/latest/neon/handle/struct.Handle.html
55-
[root]: https://docs.rs/neon/latest/neon/handle/struct.Root.html

examples/async-sqlite/index.js

Lines changed: 0 additions & 29 deletions
This file was deleted.

examples/async-sqlite/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "async-sqlite",
33
"version": "0.1.0",
44
"description": "Neon Async SQLite",
5-
"main": "index.js",
5+
"main": "index.node",
66
"scripts": {
77
"build": "cargo-cp-artifact -nc index.node -- cargo build --message-format=json-render-diagnostics",
88
"install": "npm run build",

examples/async-sqlite/src/lib.rs

Lines changed: 82 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,70 @@
1-
use std::sync::mpsc;
2-
use std::thread;
1+
use std::{ops::ControlFlow, sync::mpsc, thread};
2+
3+
use neon::{
4+
prelude::*,
5+
types::Deferred,
6+
types::extract::{Error, TryIntoJs},
7+
};
38

4-
use neon::{prelude::*, types::Deferred};
59
use rusqlite::Connection;
610

7-
type DbCallback = Box<dyn FnOnce(&mut Connection, &Channel, Deferred) + Send>;
11+
type DbCallback =
12+
Box<dyn FnOnce(&mut Connection, &Channel, Deferred) -> ControlFlow<(), ()> + Send>;
813

914
// Wraps a SQLite connection a channel, allowing concurrent access
1015
struct Database {
11-
tx: mpsc::Sender<DbMessage>,
12-
}
13-
14-
// Messages sent on the database channel
15-
enum DbMessage {
16-
// Promise to resolve and callback to be executed
17-
// Deferred is threaded through the message instead of moved to the closure so that it
18-
// can be manually rejected.
19-
Callback(Deferred, DbCallback),
20-
// Indicates that the thread should be stopped and connection closed
21-
Close,
16+
tx: mpsc::Sender<(Deferred, DbCallback)>,
2217
}
2318

24-
// Clean-up when Database is garbage collected, could go here
25-
// but, it's not needed,
26-
impl Finalize for Database {}
27-
2819
// Internal implementation
2920
impl Database {
30-
// Creates a new instance of `Database`
31-
//
32-
// 1. Creates a connection and a channel
33-
// 2. Spawns a thread and moves the channel receiver and connection to it
34-
// 3. On a separate thread, read closures off the channel and execute with access
35-
// to the connection.
36-
fn new<'a, C>(cx: &mut C) -> rusqlite::Result<Self>
21+
fn exec<'cx, O, V, F>(&self, cx: &mut Cx<'cx>, f: F) -> JsResult<'cx, JsPromise>
3722
where
38-
C: Context<'a>,
23+
F: FnOnce(&mut Connection) -> O + Send + 'static,
24+
for<'a> O: TryIntoJs<'a, Value = V> + Send + 'static,
25+
V: Value,
3926
{
27+
self.send(cx, |conn| ControlFlow::Continue(f(conn)))
28+
}
29+
30+
fn send<'cx, O, V, F>(&self, cx: &mut Cx<'cx>, f: F) -> JsResult<'cx, JsPromise>
31+
where
32+
F: FnOnce(&mut Connection) -> ControlFlow<(), O> + Send + 'static,
33+
for<'a> O: TryIntoJs<'a, Value = V> + Send + 'static,
34+
V: Value,
35+
{
36+
let (d, promise) = cx.promise();
37+
let Err(mpsc::SendError((d, _))) = self.tx.send((
38+
d,
39+
Box::new(move |conn, ch, d| {
40+
let output = f(conn).continue_value();
41+
let should_break = output.is_none();
42+
let res = d.try_settle_with(ch, move |mut cx| output.try_into_js(&mut cx));
43+
44+
// If we can no longer settle promises, we can stop the worker thread
45+
if res.is_err() || should_break {
46+
ControlFlow::Break(())
47+
} else {
48+
ControlFlow::Continue(())
49+
}
50+
}),
51+
)) else {
52+
return Ok(promise);
53+
};
54+
55+
let err = cx.error("Database is closed")?;
56+
d.reject(cx, err);
57+
58+
Ok(promise)
59+
}
60+
}
61+
62+
#[neon::export(class)]
63+
// JavaScript class
64+
impl Database {
65+
fn new(cx: &mut Cx) -> Result<Self, Error> {
4066
// Channel for sending callbacks to execute on the sqlite connection thread
41-
let (tx, rx) = mpsc::channel::<DbMessage>();
67+
let (tx, rx) = mpsc::channel();
4268

4369
// Open a connection sqlite, this will be moved to the thread
4470
let mut conn = Connection::open_in_memory()?;
@@ -48,6 +74,7 @@ impl Database {
4874
// The JavaScript process will not exit as long as this channel has not been
4975
// dropped.
5076
let channel = cx.channel();
77+
let db = Self { tx };
5178

5279
// Create a table in the in-memory database
5380
// In production code, this would likely be handled somewhere else
@@ -69,155 +96,48 @@ impl Database {
6996
// When the instance of `Database` is dropped, the channel will be closed
7097
// and `rx.recv()` will return an `Err`, ending the loop and terminating
7198
// the thread.
72-
while let Ok(message) = rx.recv() {
73-
match message {
74-
DbMessage::Callback(deferred, f) => {
75-
// The connection and channel are owned by the thread, but _lent_ to
76-
// the callback. The callback has exclusive access to the connection
77-
// for the duration of the callback.
78-
f(&mut conn, &channel, deferred);
79-
}
80-
// Immediately close the connection, even if there are pending messages
81-
DbMessage::Close => break,
99+
while let Ok((d, callback)) = rx.recv() {
100+
// Returning `true` means to _stop_
101+
if callback(&mut conn, &channel, d).is_break() {
102+
break;
82103
}
83104
}
84105
});
85106

86-
Ok(Self { tx })
87-
}
88-
89-
// Idiomatic rust would take an owned `self` to prevent use after close
90-
// However, it's not possible to prevent JavaScript from continuing to hold a closed database
91-
fn close(&self) -> Result<(), mpsc::SendError<DbMessage>> {
92-
self.tx.send(DbMessage::Close)
93-
}
94-
95-
fn send(
96-
&self,
97-
deferred: Deferred,
98-
callback: impl FnOnce(&mut Connection, &Channel, Deferred) + Send + 'static,
99-
) -> Result<(), mpsc::SendError<DbMessage>> {
100-
self.tx
101-
.send(DbMessage::Callback(deferred, Box::new(callback)))
102-
}
103-
}
104-
105-
// Methods exposed to JavaScript
106-
// The `JsBox` boxed `Database` is expected as the `this` value on all methods except `js_new`
107-
impl Database {
108-
// Create a new instance of `Database` and place it inside a `JsBox`
109-
// JavaScript can hold a reference to a `JsBox`, but the contents are opaque
110-
fn js_new(mut cx: FunctionContext) -> JsResult<JsBox<Database>> {
111-
let db = Database::new(&mut cx).or_else(|err| cx.throw_error(err.to_string()))?;
112-
113-
Ok(cx.boxed(db))
114-
}
115-
116-
// Manually close a database connection
117-
// After calling `close`, all other methods will fail
118-
// It is not necessary to call `close` since the database will be closed when the wrapping
119-
// `JsBox` is garbage collected. However, calling `close` allows the process to exit
120-
// immediately instead of waiting on garbage collection. This is useful in tests.
121-
fn js_close(mut cx: FunctionContext) -> JsResult<JsUndefined> {
122-
// Get the `this` value as a `JsBox<Database>`
123-
cx.this::<JsBox<Database>>()?
124-
.close()
125-
.or_else(|err| cx.throw_error(err.to_string()))?;
126-
127-
Ok(cx.undefined())
107+
Ok(db)
128108
}
129109

130110
// Inserts a `name` into the database
131111
// Accepts a `name` and returns a `Promise`
132-
fn js_insert(mut cx: FunctionContext) -> JsResult<JsPromise> {
133-
// Get the first argument as a `JsString` and convert to a Rust `String`
134-
let name = cx.argument::<JsString>(0)?.value(&mut cx);
135-
136-
// Get the `this` value as a `JsBox<Database>`
137-
let db = cx.this::<JsBox<Database>>()?;
138-
let (deferred, promise) = cx.promise();
139-
140-
db.send(deferred, move |conn, channel, deferred| {
141-
let result = conn
142-
.execute(
143-
"INSERT INTO person (name) VALUES (?)",
144-
rusqlite::params![name],
145-
)
146-
.map(|_| conn.last_insert_rowid());
147-
148-
deferred.settle_with(channel, move |mut cx| {
149-
let id = result.or_else(|err| cx.throw_error(err.to_string()))?;
150-
151-
Ok(cx.number(id as f64))
152-
});
112+
fn insert<'cx>(&self, cx: &mut Cx<'cx>, name: String) -> JsResult<'cx, JsPromise> {
113+
self.exec(cx, move |conn| -> Result<_, Error> {
114+
conn.execute(
115+
"INSERT INTO person (name) VALUES (?)",
116+
rusqlite::params![name],
117+
)?;
118+
119+
Ok(conn.last_insert_rowid() as f64)
153120
})
154-
.into_rejection(&mut cx)?;
155-
156-
Ok(promise)
157121
}
158122

159123
// Get a `name` by `id` value
160124
// Accepts an `id` and callback as parameters
161-
fn js_get_by_id(mut cx: FunctionContext) -> JsResult<JsPromise> {
162-
// Get the first argument as a `JsNumber` and convert to an `f64`
163-
let id = cx.argument::<JsNumber>(0)?.value(&mut cx);
164-
165-
// Get the `this` value as a `JsBox<Database>`
166-
let db = cx.this::<JsBox<Database>>()?;
167-
let (deferred, promise) = cx.promise();
168-
169-
db.send(deferred, move |conn, channel, deferred| {
170-
let result: Result<String, _> = conn
171-
.prepare("SELECT name FROM person WHERE id = ?")
172-
.and_then(|mut stmt| stmt.query_row(rusqlite::params![id], |row| row.get(0)));
173-
174-
deferred.settle_with(channel, move |mut cx| -> JsResult<JsValue> {
175-
// If the row was not found, return `undefined` as a success instead
176-
// of throwing an exception
177-
if matches!(result, Err(rusqlite::Error::QueryReturnedNoRows)) {
178-
return Ok(cx.undefined().upcast());
179-
}
180-
181-
let name = result.or_else(|err| cx.throw_error(err.to_string()))?;
182-
183-
Ok(cx.string(name).upcast())
184-
});
185-
})
186-
.into_rejection(&mut cx)?;
187-
188-
Ok(promise)
189-
}
190-
}
191-
192-
trait SendResultExt {
193-
// Sending a query closure to execute may fail if the channel has been closed.
194-
// This method converts the failure into a promise rejection.
195-
fn into_rejection<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult<()>;
196-
}
197-
198-
impl SendResultExt for Result<(), mpsc::SendError<DbMessage>> {
199-
fn into_rejection<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult<()> {
200-
self.or_else(|err| {
201-
let msg = err.to_string();
202-
203-
match err.0 {
204-
DbMessage::Callback(deferred, _) => {
205-
let err = cx.error(msg)?;
206-
deferred.reject(cx, err);
207-
Ok(())
208-
}
209-
DbMessage::Close => cx.throw_error("Expected DbMessage::Callback"),
125+
fn by_id<'cx>(&self, cx: &mut Cx<'cx>, id: f64) -> JsResult<'cx, JsPromise> {
126+
self.exec(cx, move |conn| -> Result<Option<String>, Error> {
127+
match conn
128+
.prepare("SELECT name FROM person WHERE id = ?")?
129+
.query_row(rusqlite::params![id], |row| row.get(0))
130+
{
131+
Ok(name) => Ok(Some(name)),
132+
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
133+
Err(err) => Err(Error::from(err)),
210134
}
211135
})
212136
}
213-
}
214-
215-
#[neon::main]
216-
fn main(mut cx: ModuleContext) -> NeonResult<()> {
217-
cx.export_function("databaseNew", Database::js_new)?;
218-
cx.export_function("databaseClose", Database::js_close)?;
219-
cx.export_function("databaseInsert", Database::js_insert)?;
220-
cx.export_function("databaseGetById", Database::js_get_by_id)?;
221137

222-
Ok(())
138+
// Idiomatic rust would take an owned `self` to prevent use after close
139+
// However, it's not possible to prevent JavaScript from continuing to hold a closed database
140+
fn close<'cx>(&self, cx: &mut Cx<'cx>) -> JsResult<'cx, JsPromise> {
141+
self.send(cx, |_| ControlFlow::<(), ()>::Break(()))
142+
}
223143
}

examples/async-sqlite/test/db.test.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
const assert = require("assert");
44

5-
const Database = require("..");
5+
const { Database } = require("..");
66

77
describe("SQLite Database", () => {
88
it("should insert and return name", async () => {
@@ -27,8 +27,7 @@ describe("SQLite Database", () => {
2727
it("should reject calls to a closed database", async () => {
2828
const db = new Database();
2929

30-
db.close();
31-
32-
await assert.rejects(() => db.byId(5), /closed channel/);
30+
await db.close();
31+
await assert.rejects(() => db.byId(5), /closed/);
3332
});
3433
});

0 commit comments

Comments
 (0)