Skip to content

Commit 866dbff

Browse files
authored
Merge pull request #3423 from itowlson/kv-sqlite-async-who-will-rid-me-of-this-turbulent-git
Key-value and SQLite async
2 parents c0190b9 + 3f0ced5 commit 866dbff

27 files changed

Lines changed: 891 additions & 60 deletions

File tree

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-key-value/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ spin-factors = { path = "../factors" }
1313
spin-locked-app = { path = "../locked-app" }
1414
spin-resource-table = { path = "../table" }
1515
spin-telemetry = { path = "../telemetry" }
16+
spin-wasi-async = { path = "../wasi-async" }
1617
spin-world = { path = "../world" }
1718
thiserror = { workspace = true }
1819
tokio = { workspace = true, features = ["macros", "sync", "rt"] }

crates/factor-key-value/src/host.rs

Lines changed: 170 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
use super::{Cas, SwapError};
22
use anyhow::{Context, Result};
3-
use spin_core::{async_trait, wasmtime::component::Resource};
3+
use spin_core::{
4+
async_trait,
5+
wasmtime::component::{Accessor, FutureReader, Resource, StreamReader},
6+
};
47
use spin_factor_otel::OtelFactorState;
58
use spin_resource_table::Table;
69
use spin_telemetry::traces::{self, Blame};
10+
use spin_world::spin::key_value::key_value as v3;
711
use spin_world::v2::key_value;
812
use spin_world::wasi::keyvalue as wasi_keyvalue;
913
use spin_world::MAX_HOST_BUFFERED_BYTES;
@@ -43,6 +47,13 @@ pub trait Store: Sync + Send {
4347
async fn delete(&self, key: &str) -> Result<(), Error>;
4448
async fn exists(&self, key: &str) -> Result<bool, Error>;
4549
async fn get_keys(&self, max_result_bytes: usize) -> Result<Vec<String>, Error>;
50+
async fn get_keys_async(
51+
&self,
52+
max_result_bytes: usize,
53+
) -> (
54+
tokio::sync::mpsc::Receiver<String>,
55+
tokio::sync::oneshot::Receiver<Result<(), v3::Error>>,
56+
);
4657
async fn get_many(
4758
&self,
4859
keys: Vec<String>,
@@ -220,6 +231,140 @@ impl key_value::HostStore for KeyValueDispatch {
220231
}
221232
}
222233

234+
impl spin_core::wasmtime::component::HasData for KeyValueDispatch {
235+
type Data<'a> = &'a mut KeyValueDispatch;
236+
}
237+
238+
impl v3::Host for KeyValueDispatch {}
239+
240+
impl v3::HostStore for KeyValueDispatch {
241+
async fn drop(&mut self, store: Resource<v3::Store>) -> Result<()> {
242+
self.stores.remove(store.rep());
243+
Ok(())
244+
}
245+
}
246+
247+
impl v3::HostStoreWithStore for crate::KeyValueFactorData {
248+
async fn open<T>(
249+
accessor: &Accessor<T, Self>,
250+
label: String,
251+
) -> Result<Result<Resource<v3::Store>, v3::Error>> {
252+
let (allowed, manager) = accessor.with(|mut access| {
253+
let host = access.get();
254+
host.otel.reparent_tracing_span();
255+
(host.allowed_stores.contains(&label), host.manager.clone())
256+
});
257+
258+
if !allowed {
259+
return Ok(Err(v3::Error::AccessDenied));
260+
}
261+
262+
let store = manager.get(&label).await?;
263+
store.after_open().await?;
264+
265+
let rsrc = accessor.with(|mut access| {
266+
let host = access.get();
267+
host.stores
268+
.push(store)
269+
.map(Resource::new_own)
270+
.map_err(|()| v3::Error::StoreTableFull)
271+
});
272+
273+
Ok(rsrc)
274+
}
275+
276+
async fn get<T>(
277+
accessor: &Accessor<T, Self>,
278+
store: Resource<v3::Store>,
279+
key: String,
280+
) -> Result<Result<Option<Vec<u8>>, v3::Error>> {
281+
let store = accessor.with(|mut access| {
282+
let host = access.get();
283+
host.otel.reparent_tracing_span();
284+
host.get_store(store).cloned()
285+
})?;
286+
Ok(store
287+
.get(&key, MAX_HOST_BUFFERED_BYTES)
288+
.await
289+
.map_err(to_v3_err)
290+
.map_err(track_error_on_span_v3))
291+
}
292+
293+
async fn set<T>(
294+
accessor: &Accessor<T, Self>,
295+
store: Resource<v3::Store>,
296+
key: String,
297+
value: Vec<u8>,
298+
) -> Result<Result<(), v3::Error>> {
299+
let store = accessor.with(|mut access| {
300+
let host = access.get();
301+
host.otel.reparent_tracing_span();
302+
host.get_store(store).cloned()
303+
})?;
304+
Ok(store
305+
.set(&key, &value)
306+
.await
307+
.map_err(to_v3_err)
308+
.map_err(track_error_on_span_v3))
309+
}
310+
311+
async fn delete<T>(
312+
accessor: &Accessor<T, Self>,
313+
store: Resource<v3::Store>,
314+
key: String,
315+
) -> Result<Result<(), v3::Error>> {
316+
let store = accessor.with(|mut access| {
317+
let host = access.get();
318+
host.otel.reparent_tracing_span();
319+
host.get_store(store).cloned()
320+
})?;
321+
Ok(store
322+
.delete(&key)
323+
.await
324+
.map_err(to_v3_err)
325+
.map_err(track_error_on_span_v3))
326+
}
327+
328+
async fn exists<T>(
329+
accessor: &Accessor<T, Self>,
330+
store: Resource<v3::Store>,
331+
key: String,
332+
) -> Result<Result<bool, v3::Error>> {
333+
let store = accessor.with(|mut access| {
334+
let host = access.get();
335+
host.otel.reparent_tracing_span();
336+
host.get_store(store).cloned()
337+
})?;
338+
Ok(store
339+
.exists(&key)
340+
.await
341+
.map_err(to_v3_err)
342+
.map_err(track_error_on_span_v3))
343+
}
344+
345+
async fn get_keys<T>(
346+
accessor: &Accessor<T, Self>,
347+
store: Resource<v3::Store>,
348+
) -> Result<(StreamReader<String>, FutureReader<Result<(), v3::Error>>)> {
349+
let store = accessor.with(|mut access| {
350+
let host = access.get();
351+
host.otel.reparent_tracing_span();
352+
host.get_store(store).cloned()
353+
})?;
354+
355+
let (keys_rx, err_rx) = store.get_keys_async(MAX_HOST_BUFFERED_BYTES).await;
356+
357+
let producer = spin_wasi_async::stream::producer(keys_rx);
358+
let (ksr, efr) = accessor.with(|mut access| {
359+
let ksr = StreamReader::new(&mut access, producer);
360+
let efr = FutureReader::new(&mut access, err_rx);
361+
(ksr, efr)
362+
});
363+
364+
Ok((ksr, efr))
365+
}
366+
}
367+
223368
/// Make sure that infrastructure related errors are tracked in the current span.
224369
fn track_error_on_span(err: Error) -> Error {
225370
let blame = match err {
@@ -230,6 +375,16 @@ fn track_error_on_span(err: Error) -> Error {
230375
err
231376
}
232377

378+
/// Make sure that infrastructure related errors are tracked in the current span.
379+
fn track_error_on_span_v3(err: v3::Error) -> v3::Error {
380+
let blame = match err {
381+
v3::Error::NoSuchStore | v3::Error::AccessDenied => Blame::Guest,
382+
v3::Error::StoreTableFull | v3::Error::Other(_) => Blame::Host,
383+
};
384+
traces::mark_as_error(&err, Some(blame));
385+
err
386+
}
387+
233388
fn to_wasi_err(e: Error) -> wasi_keyvalue::store::Error {
234389
match track_error_on_span(e) {
235390
Error::AccessDenied => wasi_keyvalue::store::Error::AccessDenied,
@@ -239,6 +394,15 @@ fn to_wasi_err(e: Error) -> wasi_keyvalue::store::Error {
239394
}
240395
}
241396

397+
pub fn to_v3_err(e: Error) -> v3::Error {
398+
match track_error_on_span(e) {
399+
Error::AccessDenied => v3::Error::AccessDenied,
400+
Error::NoSuchStore => v3::Error::NoSuchStore,
401+
Error::StoreTableFull => v3::Error::StoreTableFull,
402+
Error::Other(msg) => v3::Error::Other(msg),
403+
}
404+
}
405+
242406
impl wasi_keyvalue::store::Host for KeyValueDispatch {
243407
#[instrument(name = "wasi_key_value.open", skip_all, fields(otel.kind = "client"))]
244408
async fn open(
@@ -484,6 +648,11 @@ pub fn log_error(err: impl std::fmt::Debug) -> Error {
484648
Error::Other(format!("{err:?}"))
485649
}
486650

651+
pub fn log_error_v3(err: impl std::fmt::Debug) -> v3::Error {
652+
tracing::warn!("key-value error: {err:?}");
653+
v3::Error::Other(format!("{err:?}"))
654+
}
655+
487656
pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
488657
tracing::warn!("key-value error: {err:?}");
489658
SwapError::Other(format!("{err:?}"))

crates/factor-key-value/src/lib.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@ use spin_locked_app::MetadataKey;
1717

1818
/// Metadata key for key-value stores.
1919
pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
20-
pub use host::{log_cas_error, log_error, Error, KeyValueDispatch, Store, StoreManager};
20+
pub use host::to_v3_err;
21+
pub use host::{
22+
log_cas_error, log_error, log_error_v3, Error, KeyValueDispatch, Store, StoreManager,
23+
};
2124
pub use runtime_config::RuntimeConfig;
2225
use spin_core::async_trait;
26+
pub use spin_world::spin::key_value::key_value as v3;
2327
pub use util::DelegatingStoreManager;
2428

2529
/// A factor that provides key-value storage.
@@ -43,6 +47,9 @@ impl Factor for KeyValueFactor {
4347
fn init(&mut self, ctx: &mut impl InitContext<Self>) -> anyhow::Result<()> {
4448
ctx.link_bindings(spin_world::v1::key_value::add_to_linker::<_, FactorData<Self>>)?;
4549
ctx.link_bindings(spin_world::v2::key_value::add_to_linker::<_, FactorData<Self>>)?;
50+
ctx.link_bindings(
51+
spin_world::spin::key_value::key_value::add_to_linker::<_, KeyValueFactorData>,
52+
)?;
4653
ctx.link_bindings(spin_world::wasi::keyvalue::store::add_to_linker::<_, FactorData<Self>>)?;
4754
ctx.link_bindings(spin_world::wasi::keyvalue::batch::add_to_linker::<_, FactorData<Self>>)?;
4855
ctx.link_bindings(
@@ -200,3 +207,9 @@ impl FactorInstanceBuilder for InstanceBuilder {
200207
))
201208
}
202209
}
210+
211+
pub struct KeyValueFactorData(KeyValueFactor);
212+
213+
impl spin_core::wasmtime::component::HasData for KeyValueFactorData {
214+
type Data<'a> = &'a mut KeyValueDispatch;
215+
}

crates/factor-key-value/tests/factor_test.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::bail;
22
use spin_core::async_trait;
3-
use spin_factor_key_value::{Cas, KeyValueFactor, RuntimeConfig, Store, StoreManager};
3+
use spin_factor_key_value::{v3, Cas, KeyValueFactor, RuntimeConfig, Store, StoreManager};
44
use spin_factors::RuntimeFactors;
55
use spin_factors_test::{toml, TestEnvironment};
66
use spin_world::v2::key_value::{Error, HostStore};
@@ -141,6 +141,15 @@ impl Store for MockStore {
141141
let _ = max_result_bytes;
142142
todo!()
143143
}
144+
async fn get_keys_async(
145+
&self,
146+
_max_result_bytes: usize,
147+
) -> (
148+
tokio::sync::mpsc::Receiver<String>,
149+
tokio::sync::oneshot::Receiver<Result<(), v3::Error>>,
150+
) {
151+
todo!()
152+
}
144153

145154
async fn get_many(
146155
&self,

crates/factor-outbound-pg/src/host.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use anyhow::Result;
2-
use spin_core::wasmtime;
32
use spin_core::wasmtime::component::{Accessor, FutureReader, Resource, StreamReader};
43
use spin_world::spin::postgres3_0_0::postgres::{self as v3};
54
use spin_world::spin::postgres4_2_0::postgres::{self as v4};
@@ -243,8 +242,6 @@ impl<CF: ClientFactory> spin_world::spin::postgres4_2_0::postgres::HostConnectio
243242
),
244243
v4::Error,
245244
> {
246-
use wasmtime::AsContextMut;
247-
248245
let client = accessor.with(|mut access| {
249246
let host = access.get();
250247
host.connections.get(connection.rep()).unwrap().clone()
@@ -261,8 +258,8 @@ impl<CF: ClientFactory> spin_world::spin::postgres4_2_0::postgres::HostConnectio
261258
let row_producer = spin_wasi_async::stream::producer(rows);
262259

263260
let (sr, efr) = accessor.with(|mut access| {
264-
let sr = StreamReader::new(access.as_context_mut(), row_producer);
265-
let efr = FutureReader::new(access.as_context_mut(), error);
261+
let sr = StreamReader::new(&mut access, row_producer);
262+
let efr = FutureReader::new(&mut access, error);
266263
(sr, efr)
267264
});
268265

crates/factor-sqlite/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ rust-version.workspace = true
1010

1111
[dependencies]
1212
async-trait = { workspace = true }
13+
spin-core = { path = "../core" }
1314
spin-factor-otel = { path = "../factor-otel" }
1415
spin-factors = { path = "../factors" }
1516
spin-locked-app = { path = "../locked-app" }
1617
spin-resource-table = { path = "../table" }
18+
spin-wasi-async = { path = "../wasi-async" }
1719
spin-world = { path = "../world" }
1820
tokio = { workspace = true }
1921
tracing = { workspace = true }

0 commit comments

Comments
 (0)