diff --git a/Cargo.lock b/Cargo.lock index 7d267b9f1..a32a6ba66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -92,7 +92,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -103,7 +103,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -430,9 +430,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.62" +version = "1.2.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" +checksum = "556e016178bb5662a08681bbe0f00f8e17631781a4dfc8c45e466e4b185ec27f" dependencies = [ "find-msvc-tools", "jobserver", @@ -1020,7 +1020,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -1608,9 +1608,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.10.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb92f162bf56536459fc83c79b974bb12837acfed43d6bc370a7916d0ae15ecc" +checksum = "55281c53a1894c864990125767da440a4e630446785086f52523b20033b74498" dependencies = [ "atomic-waker", "bytes", @@ -1971,9 +1971,9 @@ checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" [[package]] name = "libredox" -version = "0.1.16" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e02f3bb43d335493c96bf3fd3a321600bf6bd07ed34bc64118e9293bdffea46c" +checksum = "f02ab6bace2054fb888a3c16f990117b579d14a3088e472d63c6011fa185c9d3" dependencies = [ "libc", ] @@ -2104,9 +2104,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "mio" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" +checksum = "02bd0af71c67b473010cbbc60715ee815645a4dc942899111f494b4b737d6fda" dependencies = [ "libc", "wasi 0.11.1+wasi-snapshot-preview1", @@ -2129,7 +2129,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2598,9 +2598,9 @@ dependencies = [ [[package]] name = "redis" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72d32a1ac9123f0d84fda64bfc02a271d9868483162dd2d9099b5c362ece064c" +checksum = "a12e6b5f4d8ef33944e833e2b1859ad478deab6e431d7337b30ee2efe21f7543" dependencies = [ "arc-swap", "arcstr", @@ -2764,7 +2764,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -2832,7 +2832,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -3011,9 +3011,9 @@ dependencies = [ [[package]] name = "shlex" -version = "1.3.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +checksum = "f8fadd59c855ef2080decdef8ff161eb6661b86933c9d82e5ba29dc602a55aba" [[package]] name = "signal-hook-registry" @@ -3042,12 +3042,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +checksum = "52d1cfed4120b4d927bf7c0f86d2087a4a7d6027c906d9f9d525a80573b9be51" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -3056,6 +3056,29 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "streams-io-component-server" +version = "0.1.0" +dependencies = [ + "wit-bindgen 0.45.1", +] + +[[package]] +name = "streams-io-tcp-client" +version = "0.1.0" +dependencies = [ + "anyhow", + "bytes", + "clap", + "futures", + "tokio", + "tokio-stream", + "tracing", + "tracing-subscriber", + "wit-bindgen-wrpc", + "wrpc-transport", +] + [[package]] name = "streams-nats-client" version = "0.1.0" @@ -3221,7 +3244,7 @@ dependencies = [ "fastrand", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -3637,9 +3660,9 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "typenum" -version = "1.20.0" +version = "1.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" +checksum = "b6f5e870be6c3b371b77fe0ee0bafb859fa4964b4404c27de1d380043c4dda20" [[package]] name = "unicase" @@ -3707,9 +3730,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.23.1" +version = "1.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" +checksum = "d258b83ceec21034727ecee8c382cfa6c3e133699b0742c64571814fb420c9f7" dependencies = [ "getrandom 0.4.2", "js-sys", @@ -4051,16 +4074,6 @@ dependencies = [ "wasmparser 0.248.0", ] -[[package]] -name = "wasm-encoder" -version = "0.250.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2271adb766023046af314460f1fae02cc34ea16d736d93404d3b65be44270923" -dependencies = [ - "leb128fmt", - "wasmparser 0.250.0", -] - [[package]] name = "wasm-encoder" version = "0.251.0" @@ -4180,17 +4193,6 @@ dependencies = [ "serde", ] -[[package]] -name = "wasmparser" -version = "0.250.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "071d99cdfb8111603ed05500506c3298a940b58d609dd0259d3981785dd33556" -dependencies = [ - "bitflags", - "indexmap", - "semver", -] - [[package]] name = "wasmparser" version = "0.251.0" @@ -4549,22 +4551,22 @@ dependencies = [ [[package]] name = "wast" -version = "250.0.0" +version = "251.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69e9294a1f0204aeb5c47e95165517f43ef3cc895918c4f3e939380d4c290f4a" +checksum = "5cc7467dda0a96142eb2c980329dfb62480b1e1d3622fdeb1a44e2bca6ceed74" dependencies = [ "bumpalo", "leb128fmt", "memchr", "unicode-width", - "wasm-encoder 0.250.0", + "wasm-encoder 0.251.0", ] [[package]] name = "wat" -version = "1.250.0" +version = "1.251.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a549ed329a70e444e0f7796391ab2a87d0aef30ddde9f60e16e429224fafd02" +checksum = "81b1086c9e85b95bd6a229a928bc6c6d0662e42af0250c88d067b418831ea4d4" dependencies = [ "wast", ] @@ -4638,7 +4640,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -5648,18 +5650,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.48" +version = "0.8.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +checksum = "3b065d4f0e55f82fae73202e189638116a87c55ab6b8e6c2721e13dd9d854ad1" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.48" +version = "0.8.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +checksum = "0b631b19d36a892ab55420c92dbc83ccd79274f25be714855d3074aa71cab639" dependencies = [ "proc-macro2", "quote", diff --git a/crates/runtime-wasmtime/src/codec.rs b/crates/runtime-wasmtime/src/codec.rs index e4d67d4b6..d2f29924c 100644 --- a/crates/runtime-wasmtime/src/codec.rs +++ b/crates/runtime-wasmtime/src/codec.rs @@ -33,6 +33,9 @@ pub struct ValEncoder<'a, T: 'static, W> { pub store: StoreContextMut<'a, T>, pub ty: &'a Type, pub resources: &'a [ResourceType], + /// Resource types bridged to wRPC `stream` (`wasi:io` `input-stream`/ + /// `output-stream`), identified by their possibly-uninstantiated type. + pub io_streams: &'a [ResourceType], pub deferred: Option< Box Pin> + Send>> + Send>, >, @@ -44,11 +47,13 @@ impl ValEncoder<'_, T, W> { store: StoreContextMut<'a, T>, ty: &'a Type, resources: &'a [ResourceType], + io_streams: &'a [ResourceType], ) -> ValEncoder<'a, T, W> { ValEncoder { store, ty, resources, + io_streams, deferred: None, } } @@ -58,6 +63,7 @@ impl ValEncoder<'_, T, W> { store: self.store.as_context_mut(), ty, resources: self.resources, + io_streams: self.io_streams, deferred: None, } } @@ -442,7 +448,7 @@ where Ok(()) } (Val::Resource(resource), Type::Own(ty) | Type::Borrow(ty)) => { - if *ty == ResourceType::host::() { + if *ty == ResourceType::host::() || self.io_streams.contains(ty) { let stream = resource .try_into_resource::(&mut self.store) .context("failed to downcast `wasi:io/input-stream`")?; @@ -551,10 +557,12 @@ async fn read_flags(n: usize, r: &mut (impl AsyncRead + Unpin)) -> std::io::Resu /// Read encoded value of type [`Type`] from an [`AsyncRead`] into a [`Val`] #[instrument(level = "trace", skip_all, fields(ty, path))] +#[allow(clippy::too_many_arguments)] pub async fn read_value( store: &mut impl AsContextMut, r: &mut Pin<&mut R>, resources: &[ResourceType], + io_streams: &[ResourceType], val: &mut Val, ty: &Type, path: &[usize], @@ -640,7 +648,10 @@ where let mut v = Val::Bool(false); path.push(i); trace!(i, "reading list element value"); - Box::pin(read_value(store, r, resources, &mut v, &ty, &path)).await?; + Box::pin(read_value( + store, r, resources, io_streams, &mut v, &ty, &path, + )) + .await?; path.pop(); vs.push(v); } @@ -655,7 +666,10 @@ where let mut v = Val::Bool(false); path.push(i); trace!(i, "reading struct field value"); - Box::pin(read_value(store, r, resources, &mut v, &ty, &path)).await?; + Box::pin(read_value( + store, r, resources, io_streams, &mut v, &ty, &path, + )) + .await?; path.pop(); vs.push((name.to_string(), v)); } @@ -670,7 +684,10 @@ where let mut v = Val::Bool(false); path.push(i); trace!(i, "reading tuple element value"); - Box::pin(read_value(store, r, resources, &mut v, &ty, &path)).await?; + Box::pin(read_value( + store, r, resources, io_streams, &mut v, &ty, &path, + )) + .await?; path.pop(); vs.push(v); } @@ -692,7 +709,10 @@ where if let Some(ty) = ty { let mut v = Val::Bool(false); trace!(variant = name, "reading nested variant value"); - Box::pin(read_value(store, r, resources, &mut v, &ty, path)).await?; + Box::pin(read_value( + store, r, resources, io_streams, &mut v, &ty, path, + )) + .await?; *val = Val::Variant(name, Some(Box::new(v))); } else { *val = Val::Variant(name, None); @@ -718,7 +738,16 @@ where if ok { let mut v = Val::Bool(false); trace!("reading nested `option::some` value"); - Box::pin(read_value(store, r, resources, &mut v, &ty.ty(), path)).await?; + Box::pin(read_value( + store, + r, + resources, + io_streams, + &mut v, + &ty.ty(), + path, + )) + .await?; *val = Val::Option(Some(Box::new(v))); } else { *val = Val::Option(None); @@ -731,7 +760,10 @@ where if let Some(ty) = ty.ok() { let mut v = Val::Bool(false); trace!("reading nested `result::ok` value"); - Box::pin(read_value(store, r, resources, &mut v, &ty, path)).await?; + Box::pin(read_value( + store, r, resources, io_streams, &mut v, &ty, path, + )) + .await?; *val = Val::Result(Ok(Some(Box::new(v)))); } else { *val = Val::Result(Ok(None)); @@ -739,7 +771,10 @@ where } else if let Some(ty) = ty.err() { let mut v = Val::Bool(false); trace!("reading nested `result::err` value"); - Box::pin(read_value(store, r, resources, &mut v, &ty, path)).await?; + Box::pin(read_value( + store, r, resources, io_streams, &mut v, &ty, path, + )) + .await?; *val = Val::Result(Err(Some(Box::new(v)))); } else { *val = Val::Result(Err(None)); @@ -798,20 +833,24 @@ where Ok(()) } Type::Own(ty) | Type::Borrow(ty) => { - if *ty == ResourceType::host::() { + if *ty == ResourceType::host::() || io_streams.contains(ty) { let mut store = store.as_context_mut(); let r = r.index(path).map_err(std::io::Error::other)?; // TODO: Implement a custom reader, this approach ignores the stream end (`\0`), // which will could potentially break/hang with some transports + // The stream must be typed as `DynInputStream` (the host resource type), + // otherwise the resulting resource handle carries the concrete reader type + // and fails the guest's `own` type check. + let stream: DynInputStream = Box::new(AsyncReadStream::new( + FramedRead::new(r, ListDecoderU8::default()) + .into_async_read() + .compat(), + )); let res = store .data_mut() .wrpc() .table - .push(Box::new(AsyncReadStream::new( - FramedRead::new(r, ListDecoderU8::default()) - .into_async_read() - .compat(), - ))) + .push(stream) .map_err(|err| std::io::Error::new(std::io::ErrorKind::OutOfMemory, err))?; let v = res .try_into_resource_any(store) diff --git a/crates/runtime-wasmtime/src/lib.rs b/crates/runtime-wasmtime/src/lib.rs index e1c384722..d036ec01f 100644 --- a/crates/runtime-wasmtime/src/lib.rs +++ b/crates/runtime-wasmtime/src/lib.rs @@ -31,6 +31,7 @@ use crate::bindings::rpc::transport::{IncomingChannel, Invocation, OutgoingChann pub mod bindings; mod codec; +pub mod paths; mod polyfill; pub mod rpc; mod serve; @@ -383,6 +384,7 @@ pub async fn call( mut tx: O, guest_resources: &[ResourceType], host_resources: &HashMap, HashMap, (ResourceType, ResourceType)>>, + io_streams: &[ResourceType], params_ty: impl ExactSizeIterator, results_ty: &[Type], func: Func, @@ -396,10 +398,18 @@ where let mut params = vec![Val::Bool(false); params_ty.len()]; let mut rx = pin!(rx); for (i, (v, ty)) in zip(&mut params, params_ty).enumerate() { - read_value(&mut store, &mut rx, guest_resources, v, ty, &[i]) - .await - .with_context(|| format!("failed to decode parameter value {i}")) - .map_err(CallError::Decode)?; + read_value( + &mut store, + &mut rx, + guest_resources, + io_streams, + v, + ty, + &[i], + ) + .await + .with_context(|| format!("failed to decode parameter value {i}")) + .map_err(CallError::Decode)?; } let mut results = vec![Val::Bool(false); results_ty.len()]; func.call_async(&mut store, ¶ms, &mut results) @@ -415,7 +425,8 @@ where ) { (None, results) => { for (i, (v, ty)) in zip(results, results_ty).enumerate() { - let mut enc = ValEncoder::new(store.as_context_mut(), ty, guest_resources); + let mut enc = + ValEncoder::new(store.as_context_mut(), ty, guest_resources, io_streams); enc.encode(v, &mut buf) .with_context(|| format!("failed to encode result value {i}")) .map_err(CallError::Encode)?; @@ -426,7 +437,7 @@ where (Some(None), [Val::Result(Ok(None))]) => {} // `result` (Some(Some(ty)), [Val::Result(Ok(Some(v)))]) => { - let mut enc = ValEncoder::new(store.as_context_mut(), ty, guest_resources); + let mut enc = ValEncoder::new(store.as_context_mut(), ty, guest_resources, io_streams); enc.encode(v, &mut buf) .context("failed to encode result value 0") .map_err(CallError::Encode)?; diff --git a/crates/runtime-wasmtime/src/paths.rs b/crates/runtime-wasmtime/src/paths.rs new file mode 100644 index 000000000..16e79481d --- /dev/null +++ b/crates/runtime-wasmtime/src/paths.rs @@ -0,0 +1,171 @@ +//! Computation of wRPC asynchronous subscription paths from Wasmtime component types. +//! +//! When serving a component function over wRPC, the transport must be told which +//! nested positions of the parameters carry asynchronous values (streams and +//! futures) so it can subscribe to their data channels. `wasi:io` +//! `input-stream`/`output-stream` resources are bridged to wRPC `stream` by +//! this runtime, so they are treated as asynchronous here as well. +//! +//! Note that at serve time the component has not been instantiated, so a +//! `wasi:io` stream parameter appears as the component's own *uninstantiated* +//! resource type rather than the host [`DynInputStream`]/[`DynOutputStream`] +//! type. We therefore identify those resources by collecting the component's +//! `wasi:io/streams` imports up front and comparing against them. + +use std::collections::{BTreeMap, BTreeSet, VecDeque}; + +use wasmtime::component::types::{self, Type}; +use wasmtime::component::ResourceType; +use wasmtime::Engine; + +/// Collect the (uninstantiated) resource types of the component's imported +/// `wasi:io/streams` `input-stream` and `output-stream`, against which +/// parameter resource types can be compared at serve time. +/// +/// A `Vec` is used rather than a set because [`ResourceType`] is neither `Ord` +/// nor `Hash`; the collection holds at most two entries. +pub fn wasi_io_stream_resources( + engine: &Engine, + component: &types::Component, +) -> Vec { + let mut imports = BTreeMap::new(); + crate::collect_component_resource_imports(engine, component, &mut imports); + let mut out = Vec::new(); + for (instance, resources) in imports { + // Instance names are versioned, e.g. `wasi:io/streams@0.2.6`. + let base = instance.split('@').next().unwrap_or(&instance); + if base != "wasi:io/streams" { + continue; + } + for (name, ty) in resources { + if (&*name == "input-stream" || &*name == "output-stream") && !out.contains(&ty) { + out.push(ty); + } + } + } + out +} + +/// Compute the set of nested asynchronous paths within a single value type, and +/// whether the type *itself* is asynchronous (a stream or future). +/// +/// Mirrors `wrpc_introspect::async_paths_ty`, but operates over Wasmtime's +/// component [`Type`] rather than WIT types. `streams` is the set of resource +/// types that are bridged to wRPC `stream` (see [`wasi_io_stream_resources`]). +fn async_paths(ty: &Type, streams: &[ResourceType]) -> (BTreeSet>>, bool) { + let mut paths = BTreeSet::new(); + match ty { + Type::List(ty) => { + let (nested, fut) = async_paths(&ty.ty(), streams); + for mut path in nested { + path.push_front(None); + paths.insert(path); + } + if fut { + paths.insert(VecDeque::from([None])); + } + (paths, false) + } + Type::Option(ty) => async_paths(&ty.ty(), streams), + Type::Result(ty) => { + let mut is_fut = false; + if let Some(ty) = ty.ok() { + let (nested, fut) = async_paths(&ty, streams); + paths.extend(nested); + is_fut |= fut; + } + if let Some(ty) = ty.err() { + let (nested, fut) = async_paths(&ty, streams); + paths.extend(nested); + is_fut |= fut; + } + (paths, is_fut) + } + Type::Variant(ty) => { + let mut is_fut = false; + for case in ty.cases() { + if let Some(ty) = case.ty { + let (nested, fut) = async_paths(&ty, streams); + paths.extend(nested); + is_fut |= fut; + } + } + (paths, is_fut) + } + Type::Tuple(ty) => { + for (i, ty) in ty.types().enumerate() { + let (nested, fut) = async_paths(&ty, streams); + for mut path in nested { + path.push_front(Some(i)); + paths.insert(path); + } + if fut { + paths.insert(VecDeque::from([Some(i)])); + } + } + (paths, false) + } + Type::Record(ty) => { + for (i, field) in ty.fields().enumerate() { + let (nested, fut) = async_paths(&field.ty, streams); + for mut path in nested { + path.push_front(Some(i)); + paths.insert(path); + } + if fut { + paths.insert(VecDeque::from([Some(i)])); + } + } + (paths, false) + } + Type::Future(ty) => { + if let Some(ty) = ty.ty() { + (paths, _) = async_paths(&ty, streams); + } + (paths, true) + } + Type::Stream(ty) => { + if let Some(ty) = ty.ty() { + let (nested, fut) = async_paths(&ty, streams); + for mut path in nested { + path.push_front(None); + paths.insert(path); + } + if fut { + paths.insert(VecDeque::from([None])); + } + } + (paths, true) + } + Type::Own(ty) | Type::Borrow(ty) if streams.contains(ty) => { + // `wasi:io` streams are sent/received as wRPC `stream`. + (paths, true) + } + _ => (paths, false), + } +} + +/// Compute the wRPC subscription paths for a function's parameter list. +/// +/// Each parameter is treated as an element of a top-level tuple: a parameter at +/// index `i` whose type carries asynchronous data contributes paths prefixed +/// with `Some(i)`. +pub(crate) fn params_async_paths<'a>( + params: impl IntoIterator, + streams: &[ResourceType], +) -> Vec]>> { + let mut out = BTreeSet::new(); + for (i, ty) in params.into_iter().enumerate() { + let (nested, fut) = async_paths(ty, streams); + for mut path in nested { + path.push_front(Some(i)); + out.insert(path); + } + if fut { + out.insert(VecDeque::from([Some(i)])); + } + } + out.into_iter() + .map(|path| path.into_iter().collect::>()) + .collect() +} diff --git a/crates/runtime-wasmtime/src/polyfill.rs b/crates/runtime-wasmtime/src/polyfill.rs index 88c0d2334..9965c50dc 100644 --- a/crates/runtime-wasmtime/src/polyfill.rs +++ b/crates/runtime-wasmtime/src/polyfill.rs @@ -145,7 +145,7 @@ async fn invoke( let mut buf = BytesMut::default(); let mut deferred = vec![]; for (v, (name, ref ty)) in zip(params, params_ty) { - let mut enc = ValEncoder::new(store.as_context_mut(), ty, &guest_resources); + let mut enc = ValEncoder::new(store.as_context_mut(), ty, &guest_resources, &[]); enc.encode(v, &mut buf) .with_context(|| format!("failed to encode parameter `{name}`"))?; deferred.push(enc.deferred); @@ -198,9 +198,17 @@ async fn invoke( let rx = async { let mut incoming = pin!(incoming); for (i, (v, ref ty)) in zip(results, results_ty).enumerate() { - read_value(&mut store, &mut incoming, &guest_resources, v, ty, &[i]) - .await - .with_context(|| format!("failed to decode return value {i}"))?; + read_value( + &mut store, + &mut incoming, + &guest_resources, + &[], + v, + ty, + &[i], + ) + .await + .with_context(|| format!("failed to decode return value {i}"))?; } wasmtime::error::Ok(()) }; diff --git a/crates/runtime-wasmtime/src/serve.rs b/crates/runtime-wasmtime/src/serve.rs index ceb959a57..53e91f766 100644 --- a/crates/runtime-wasmtime/src/serve.rs +++ b/crates/runtime-wasmtime/src/serve.rs @@ -59,10 +59,17 @@ pub trait ServeExt: wrpc_transport::Serve { .get_export_index(idx.as_ref(), name) .with_context(|| format!("export `{name}` not found"))?; - // TODO: set paths - let invocations = self.serve(instance_name, rpc_func_name(name), []).await?; - let name = Arc::::from(name); let params_ty: Arc<[_]> = ty.params().map(|(_, ty)| ty).collect(); + let io_streams: Arc<[ResourceType]> = crate::paths::wasi_io_stream_resources( + component_ty.engine(), + &component_ty.component_type(), + ) + .into(); + let paths = crate::paths::params_async_paths(params_ty.iter(), &io_streams); + let invocations = self + .serve(instance_name, rpc_func_name(name), paths) + .await?; + let name = Arc::::from(name); let results_ty: Arc<[_]> = ty.results().collect(); let host_resources = Arc::clone(&host_resources); Ok(invocations.map_ok(move |(cx, tx, rx)| { @@ -71,6 +78,7 @@ pub trait ServeExt: wrpc_transport::Serve { let params_ty = Arc::clone(¶ms_ty); let results_ty = Arc::clone(&results_ty); let host_resources = Arc::clone(&host_resources); + let io_streams = Arc::clone(&io_streams); let mut store = store(); ( @@ -91,6 +99,7 @@ pub trait ServeExt: wrpc_transport::Serve { tx, &[], &host_resources, + &io_streams, params_ty.iter(), &results_ty, func, @@ -120,6 +129,7 @@ pub trait ServeExt: wrpc_transport::Serve { host_resources: impl Into< Arc, HashMap, (ResourceType, ResourceType)>>>, >, + io_stream_resources: impl Into>, ty: types::ComponentFunc, instance_name: &str, name: &str, @@ -140,6 +150,7 @@ pub trait ServeExt: wrpc_transport::Serve { let span = Span::current(); let guest_resources = guest_resources.into(); let host_resources = host_resources.into(); + let io_stream_resources = io_stream_resources.into(); async move { let func = { let mut store = store.lock().await; @@ -158,9 +169,11 @@ pub trait ServeExt: wrpc_transport::Serve { } .with_context(|| format!("function export `{name}` not found"))?; debug!(instance = instance_name, name, "serving function export"); - // TODO: set paths - let invocations = self.serve(instance_name, rpc_func_name(name), []).await?; let params_ty: Arc<[_]> = ty.params().map(|(_, ty)| ty).collect(); + let paths = crate::paths::params_async_paths(params_ty.iter(), &io_stream_resources); + let invocations = self + .serve(instance_name, rpc_func_name(name), paths) + .await?; let results_ty: Arc<[_]> = ty.results().collect(); let guest_resources = Arc::clone(&guest_resources); let host_resources = Arc::clone(&host_resources); @@ -169,6 +182,7 @@ pub trait ServeExt: wrpc_transport::Serve { let results_ty = Arc::clone(&results_ty); let guest_resources = Arc::clone(&guest_resources); let host_resources = Arc::clone(&host_resources); + let io_stream_resources = Arc::clone(&io_stream_resources); let store = Arc::clone(&store); ( cx, @@ -181,6 +195,7 @@ pub trait ServeExt: wrpc_transport::Serve { tx, &guest_resources, &host_resources, + &io_stream_resources, params_ty.iter(), &results_ty, func, diff --git a/crates/wasmtime-cli/src/lib.rs b/crates/wasmtime-cli/src/lib.rs index c870daac6..6104e0b8a 100644 --- a/crates/wasmtime-cli/src/lib.rs +++ b/crates/wasmtime-cli/src/lib.rs @@ -447,6 +447,12 @@ where .map_err(anyhow::Error::from) .context("failed to instantiate component")?; let engine = store.engine().clone(); + let io_stream_resources: Arc<[ResourceType]> = + wrpc_runtime_wasmtime::paths::wasi_io_stream_resources( + &engine, + &pre.component().component_type(), + ) + .into(); let store = Arc::new(Mutex::new(store)); for (name, ty) in pre.component().component_type().exports(&engine) { match (name, ty) { @@ -458,6 +464,7 @@ where instance, Arc::clone(&guest_resources), Arc::clone(&host_resources), + Arc::clone(&io_stream_resources), ty, "", name, @@ -505,6 +512,7 @@ where instance, Arc::clone(&guest_resources), Arc::clone(&host_resources), + Arc::clone(&io_stream_resources), ty, instance_name, name, diff --git a/examples/rust/streams-io-component-server/Cargo.toml b/examples/rust/streams-io-component-server/Cargo.toml new file mode 100644 index 000000000..ed8fe2fb0 --- /dev/null +++ b/examples/rust/streams-io-component-server/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "streams-io-component-server" +version = "0.1.0" + +authors.workspace = true +categories.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true + +[lib] +crate-type = ["cdylib"] + +[dependencies] +wit-bindgen = { workspace = true, features = ["realloc", "macros"] } diff --git a/examples/rust/streams-io-component-server/src/lib.rs b/examples/rust/streams-io-component-server/src/lib.rs new file mode 100644 index 000000000..62e7fe59e --- /dev/null +++ b/examples/rust/streams-io-component-server/src/lib.rs @@ -0,0 +1,37 @@ +mod bindings { + use crate::Handler; + + wit_bindgen::generate!({ + with: { + "wasi:io/error@0.2.0": generate, + "wasi:io/poll@0.2.0": generate, + "wasi:io/streams@0.2.0": generate, + } + }); + + export!(Handler); +} + +use bindings::exports::wrpc_examples::streams_io::handler::Guest; +use bindings::wasi::io::streams::{InputStream, StreamError}; + +pub struct Handler; + +impl Guest for Handler { + /// Read the entire input stream and return the total number of bytes read. + fn count(data: InputStream) -> u64 { + let mut total: u64 = 0; + loop { + // `blocking-read` waits for at least one byte, then returns up to `len`. + match data.blocking_read(8096) { + Ok(chunk) => total += chunk.len() as u64, + Err(StreamError::Closed) => return total, + Err(StreamError::LastOperationFailed(err)) => { + // Surface the failure on stderr; return what we have so far. + eprintln!("stream read failed: {}", err.to_debug_string()); + return total; + } + } + } + } +} diff --git a/examples/rust/streams-io-component-server/wit/deps/io/error.wit b/examples/rust/streams-io-component-server/wit/deps/io/error.wit new file mode 100644 index 000000000..22e5b6489 --- /dev/null +++ b/examples/rust/streams-io-component-server/wit/deps/io/error.wit @@ -0,0 +1,34 @@ +package wasi:io@0.2.0; + + +interface error { + /// A resource which represents some error information. + /// + /// The only method provided by this resource is `to-debug-string`, + /// which provides some human-readable information about the error. + /// + /// In the `wasi:io` package, this resource is returned through the + /// `wasi:io/streams/stream-error` type. + /// + /// To provide more specific error information, other interfaces may + /// provide functions to further "downcast" this error into more specific + /// error information. For example, `error`s returned in streams derived + /// from filesystem types to be described using the filesystem's own + /// error-code type, using the function + /// `wasi:filesystem/types/filesystem-error-code`, which takes a parameter + /// `borrow` and returns + /// `option`. + /// + /// The set of functions which can "downcast" an `error` into a more + /// concrete type is open. + resource error { + /// Returns a string that is suitable to assist humans in debugging + /// this error. + /// + /// WARNING: The returned string should not be consumed mechanically! + /// It may change across platforms, hosts, or other implementation + /// details. Parsing this string is a major platform-compatibility + /// hazard. + to-debug-string: func() -> string; + } +} diff --git a/examples/rust/streams-io-component-server/wit/deps/io/poll.wit b/examples/rust/streams-io-component-server/wit/deps/io/poll.wit new file mode 100644 index 000000000..ddc67f8b7 --- /dev/null +++ b/examples/rust/streams-io-component-server/wit/deps/io/poll.wit @@ -0,0 +1,41 @@ +package wasi:io@0.2.0; + +/// A poll API intended to let users wait for I/O events on multiple handles +/// at once. +interface poll { + /// `pollable` represents a single I/O event which may be ready, or not. + resource pollable { + + /// Return the readiness of a pollable. This function never blocks. + /// + /// Returns `true` when the pollable is ready, and `false` otherwise. + ready: func() -> bool; + + /// `block` returns immediately if the pollable is ready, and otherwise + /// blocks until ready. + /// + /// This function is equivalent to calling `poll.poll` on a list + /// containing only this pollable. + block: func(); + } + + /// Poll for completion on a set of pollables. + /// + /// This function takes a list of pollables, which identify I/O sources of + /// interest, and waits until one or more of the events is ready for I/O. + /// + /// The result `list` contains one or more indices of handles in the + /// argument list that is ready for I/O. + /// + /// If the list contains more elements than can be indexed with a `u32` + /// value, this function traps. + /// + /// A timeout can be implemented by adding a pollable from the + /// wasi-clocks API to the list. + /// + /// This function does not return a `result`; polling in itself does not + /// do any I/O so it doesn't fail. If any of the I/O sources identified by + /// the pollables has an error, it is indicated by marking the source as + /// being reaedy for I/O. + poll: func(in: list>) -> list; +} diff --git a/examples/rust/streams-io-component-server/wit/deps/io/streams.wit b/examples/rust/streams-io-component-server/wit/deps/io/streams.wit new file mode 100644 index 000000000..6d2f871e3 --- /dev/null +++ b/examples/rust/streams-io-component-server/wit/deps/io/streams.wit @@ -0,0 +1,262 @@ +package wasi:io@0.2.0; + +/// WASI I/O is an I/O abstraction API which is currently focused on providing +/// stream types. +/// +/// In the future, the component model is expected to add built-in stream types; +/// when it does, they are expected to subsume this API. +interface streams { + use error.{error}; + use poll.{pollable}; + + /// An error for input-stream and output-stream operations. + variant stream-error { + /// The last operation (a write or flush) failed before completion. + /// + /// More information is available in the `error` payload. + last-operation-failed(error), + /// The stream is closed: no more input will be accepted by the + /// stream. A closed output-stream will return this error on all + /// future operations. + closed + } + + /// An input bytestream. + /// + /// `input-stream`s are *non-blocking* to the extent practical on underlying + /// platforms. I/O operations always return promptly; if fewer bytes are + /// promptly available than requested, they return the number of bytes promptly + /// available, which could even be zero. To wait for data to be available, + /// use the `subscribe` function to obtain a `pollable` which can be polled + /// for using `wasi:io/poll`. + resource input-stream { + /// Perform a non-blocking read from the stream. + /// + /// When the source of a `read` is binary data, the bytes from the source + /// are returned verbatim. When the source of a `read` is known to the + /// implementation to be text, bytes containing the UTF-8 encoding of the + /// text are returned. + /// + /// This function returns a list of bytes containing the read data, + /// when successful. The returned list will contain up to `len` bytes; + /// it may return fewer than requested, but not more. The list is + /// empty when no bytes are available for reading at this time. The + /// pollable given by `subscribe` will be ready when more bytes are + /// available. + /// + /// This function fails with a `stream-error` when the operation + /// encounters an error, giving `last-operation-failed`, or when the + /// stream is closed, giving `closed`. + /// + /// When the caller gives a `len` of 0, it represents a request to + /// read 0 bytes. If the stream is still open, this call should + /// succeed and return an empty list, or otherwise fail with `closed`. + /// + /// The `len` parameter is a `u64`, which could represent a list of u8 which + /// is not possible to allocate in wasm32, or not desirable to allocate as + /// as a return value by the callee. The callee may return a list of bytes + /// less than `len` in size while more bytes are available for reading. + read: func( + /// The maximum number of bytes to read + len: u64 + ) -> result, stream-error>; + + /// Read bytes from a stream, after blocking until at least one byte can + /// be read. Except for blocking, behavior is identical to `read`. + blocking-read: func( + /// The maximum number of bytes to read + len: u64 + ) -> result, stream-error>; + + /// Skip bytes from a stream. Returns number of bytes skipped. + /// + /// Behaves identical to `read`, except instead of returning a list + /// of bytes, returns the number of bytes consumed from the stream. + skip: func( + /// The maximum number of bytes to skip. + len: u64, + ) -> result; + + /// Skip bytes from a stream, after blocking until at least one byte + /// can be skipped. Except for blocking behavior, identical to `skip`. + blocking-skip: func( + /// The maximum number of bytes to skip. + len: u64, + ) -> result; + + /// Create a `pollable` which will resolve once either the specified stream + /// has bytes available to read or the other end of the stream has been + /// closed. + /// The created `pollable` is a child resource of the `input-stream`. + /// Implementations may trap if the `input-stream` is dropped before + /// all derived `pollable`s created with this function are dropped. + subscribe: func() -> pollable; + } + + + /// An output bytestream. + /// + /// `output-stream`s are *non-blocking* to the extent practical on + /// underlying platforms. Except where specified otherwise, I/O operations also + /// always return promptly, after the number of bytes that can be written + /// promptly, which could even be zero. To wait for the stream to be ready to + /// accept data, the `subscribe` function to obtain a `pollable` which can be + /// polled for using `wasi:io/poll`. + resource output-stream { + /// Check readiness for writing. This function never blocks. + /// + /// Returns the number of bytes permitted for the next call to `write`, + /// or an error. Calling `write` with more bytes than this function has + /// permitted will trap. + /// + /// When this function returns 0 bytes, the `subscribe` pollable will + /// become ready when this function will report at least 1 byte, or an + /// error. + check-write: func() -> result; + + /// Perform a write. This function never blocks. + /// + /// When the destination of a `write` is binary data, the bytes from + /// `contents` are written verbatim. When the destination of a `write` is + /// known to the implementation to be text, the bytes of `contents` are + /// transcoded from UTF-8 into the encoding of the destination and then + /// written. + /// + /// Precondition: check-write gave permit of Ok(n) and contents has a + /// length of less than or equal to n. Otherwise, this function will trap. + /// + /// returns Err(closed) without writing if the stream has closed since + /// the last call to check-write provided a permit. + write: func( + contents: list + ) -> result<_, stream-error>; + + /// Perform a write of up to 4096 bytes, and then flush the stream. Block + /// until all of these operations are complete, or an error occurs. + /// + /// This is a convenience wrapper around the use of `check-write`, + /// `subscribe`, `write`, and `flush`, and is implemented with the + /// following pseudo-code: + /// + /// ```text + /// let pollable = this.subscribe(); + /// while !contents.is_empty() { + /// // Wait for the stream to become writable + /// pollable.block(); + /// let Ok(n) = this.check-write(); // eliding error handling + /// let len = min(n, contents.len()); + /// let (chunk, rest) = contents.split_at(len); + /// this.write(chunk ); // eliding error handling + /// contents = rest; + /// } + /// this.flush(); + /// // Wait for completion of `flush` + /// pollable.block(); + /// // Check for any errors that arose during `flush` + /// let _ = this.check-write(); // eliding error handling + /// ``` + blocking-write-and-flush: func( + contents: list + ) -> result<_, stream-error>; + + /// Request to flush buffered output. This function never blocks. + /// + /// This tells the output-stream that the caller intends any buffered + /// output to be flushed. the output which is expected to be flushed + /// is all that has been passed to `write` prior to this call. + /// + /// Upon calling this function, the `output-stream` will not accept any + /// writes (`check-write` will return `ok(0)`) until the flush has + /// completed. The `subscribe` pollable will become ready when the + /// flush has completed and the stream can accept more writes. + flush: func() -> result<_, stream-error>; + + /// Request to flush buffered output, and block until flush completes + /// and stream is ready for writing again. + blocking-flush: func() -> result<_, stream-error>; + + /// Create a `pollable` which will resolve once the output-stream + /// is ready for more writing, or an error has occured. When this + /// pollable is ready, `check-write` will return `ok(n)` with n>0, or an + /// error. + /// + /// If the stream is closed, this pollable is always ready immediately. + /// + /// The created `pollable` is a child resource of the `output-stream`. + /// Implementations may trap if the `output-stream` is dropped before + /// all derived `pollable`s created with this function are dropped. + subscribe: func() -> pollable; + + /// Write zeroes to a stream. + /// + /// This should be used precisely like `write` with the exact same + /// preconditions (must use check-write first), but instead of + /// passing a list of bytes, you simply pass the number of zero-bytes + /// that should be written. + write-zeroes: func( + /// The number of zero-bytes to write + len: u64 + ) -> result<_, stream-error>; + + /// Perform a write of up to 4096 zeroes, and then flush the stream. + /// Block until all of these operations are complete, or an error + /// occurs. + /// + /// This is a convenience wrapper around the use of `check-write`, + /// `subscribe`, `write-zeroes`, and `flush`, and is implemented with + /// the following pseudo-code: + /// + /// ```text + /// let pollable = this.subscribe(); + /// while num_zeroes != 0 { + /// // Wait for the stream to become writable + /// pollable.block(); + /// let Ok(n) = this.check-write(); // eliding error handling + /// let len = min(n, num_zeroes); + /// this.write-zeroes(len); // eliding error handling + /// num_zeroes -= len; + /// } + /// this.flush(); + /// // Wait for completion of `flush` + /// pollable.block(); + /// // Check for any errors that arose during `flush` + /// let _ = this.check-write(); // eliding error handling + /// ``` + blocking-write-zeroes-and-flush: func( + /// The number of zero-bytes to write + len: u64 + ) -> result<_, stream-error>; + + /// Read from one stream and write to another. + /// + /// The behavior of splice is equivelant to: + /// 1. calling `check-write` on the `output-stream` + /// 2. calling `read` on the `input-stream` with the smaller of the + /// `check-write` permitted length and the `len` provided to `splice` + /// 3. calling `write` on the `output-stream` with that read data. + /// + /// Any error reported by the call to `check-write`, `read`, or + /// `write` ends the splice and reports that error. + /// + /// This function returns the number of bytes transferred; it may be less + /// than `len`. + splice: func( + /// The stream to read from + src: borrow, + /// The number of bytes to splice + len: u64, + ) -> result; + + /// Read from one stream and write to another, with blocking. + /// + /// This is similar to `splice`, except that it blocks until the + /// `output-stream` is ready for writing, and the `input-stream` + /// is ready for reading, before performing the `splice`. + blocking-splice: func( + /// The stream to read from + src: borrow, + /// The number of bytes to splice + len: u64, + ) -> result; + } +} diff --git a/examples/rust/streams-io-component-server/wit/deps/io/world.wit b/examples/rust/streams-io-component-server/wit/deps/io/world.wit new file mode 100644 index 000000000..5f0b43fe5 --- /dev/null +++ b/examples/rust/streams-io-component-server/wit/deps/io/world.wit @@ -0,0 +1,6 @@ +package wasi:io@0.2.0; + +world imports { + import streams; + import poll; +} diff --git a/examples/rust/streams-io-component-server/wit/hello/hello.wit b/examples/rust/streams-io-component-server/wit/hello/hello.wit new file mode 100644 index 000000000..a5356eaff --- /dev/null +++ b/examples/rust/streams-io-component-server/wit/hello/hello.wit @@ -0,0 +1,15 @@ +package wrpc-examples:hello; + +interface handler { + use wrpc:rpc/error@0.1.0.{error}; + + hello: func() -> result; +} + +world client { + import handler; +} + +world server { + export handler; +} diff --git a/examples/rust/streams-io-component-server/wit/io/error.wit b/examples/rust/streams-io-component-server/wit/io/error.wit new file mode 100644 index 000000000..22e5b6489 --- /dev/null +++ b/examples/rust/streams-io-component-server/wit/io/error.wit @@ -0,0 +1,34 @@ +package wasi:io@0.2.0; + + +interface error { + /// A resource which represents some error information. + /// + /// The only method provided by this resource is `to-debug-string`, + /// which provides some human-readable information about the error. + /// + /// In the `wasi:io` package, this resource is returned through the + /// `wasi:io/streams/stream-error` type. + /// + /// To provide more specific error information, other interfaces may + /// provide functions to further "downcast" this error into more specific + /// error information. For example, `error`s returned in streams derived + /// from filesystem types to be described using the filesystem's own + /// error-code type, using the function + /// `wasi:filesystem/types/filesystem-error-code`, which takes a parameter + /// `borrow` and returns + /// `option`. + /// + /// The set of functions which can "downcast" an `error` into a more + /// concrete type is open. + resource error { + /// Returns a string that is suitable to assist humans in debugging + /// this error. + /// + /// WARNING: The returned string should not be consumed mechanically! + /// It may change across platforms, hosts, or other implementation + /// details. Parsing this string is a major platform-compatibility + /// hazard. + to-debug-string: func() -> string; + } +} diff --git a/examples/rust/streams-io-component-server/wit/io/poll.wit b/examples/rust/streams-io-component-server/wit/io/poll.wit new file mode 100644 index 000000000..ddc67f8b7 --- /dev/null +++ b/examples/rust/streams-io-component-server/wit/io/poll.wit @@ -0,0 +1,41 @@ +package wasi:io@0.2.0; + +/// A poll API intended to let users wait for I/O events on multiple handles +/// at once. +interface poll { + /// `pollable` represents a single I/O event which may be ready, or not. + resource pollable { + + /// Return the readiness of a pollable. This function never blocks. + /// + /// Returns `true` when the pollable is ready, and `false` otherwise. + ready: func() -> bool; + + /// `block` returns immediately if the pollable is ready, and otherwise + /// blocks until ready. + /// + /// This function is equivalent to calling `poll.poll` on a list + /// containing only this pollable. + block: func(); + } + + /// Poll for completion on a set of pollables. + /// + /// This function takes a list of pollables, which identify I/O sources of + /// interest, and waits until one or more of the events is ready for I/O. + /// + /// The result `list` contains one or more indices of handles in the + /// argument list that is ready for I/O. + /// + /// If the list contains more elements than can be indexed with a `u32` + /// value, this function traps. + /// + /// A timeout can be implemented by adding a pollable from the + /// wasi-clocks API to the list. + /// + /// This function does not return a `result`; polling in itself does not + /// do any I/O so it doesn't fail. If any of the I/O sources identified by + /// the pollables has an error, it is indicated by marking the source as + /// being reaedy for I/O. + poll: func(in: list>) -> list; +} diff --git a/examples/rust/streams-io-component-server/wit/io/streams.wit b/examples/rust/streams-io-component-server/wit/io/streams.wit new file mode 100644 index 000000000..6d2f871e3 --- /dev/null +++ b/examples/rust/streams-io-component-server/wit/io/streams.wit @@ -0,0 +1,262 @@ +package wasi:io@0.2.0; + +/// WASI I/O is an I/O abstraction API which is currently focused on providing +/// stream types. +/// +/// In the future, the component model is expected to add built-in stream types; +/// when it does, they are expected to subsume this API. +interface streams { + use error.{error}; + use poll.{pollable}; + + /// An error for input-stream and output-stream operations. + variant stream-error { + /// The last operation (a write or flush) failed before completion. + /// + /// More information is available in the `error` payload. + last-operation-failed(error), + /// The stream is closed: no more input will be accepted by the + /// stream. A closed output-stream will return this error on all + /// future operations. + closed + } + + /// An input bytestream. + /// + /// `input-stream`s are *non-blocking* to the extent practical on underlying + /// platforms. I/O operations always return promptly; if fewer bytes are + /// promptly available than requested, they return the number of bytes promptly + /// available, which could even be zero. To wait for data to be available, + /// use the `subscribe` function to obtain a `pollable` which can be polled + /// for using `wasi:io/poll`. + resource input-stream { + /// Perform a non-blocking read from the stream. + /// + /// When the source of a `read` is binary data, the bytes from the source + /// are returned verbatim. When the source of a `read` is known to the + /// implementation to be text, bytes containing the UTF-8 encoding of the + /// text are returned. + /// + /// This function returns a list of bytes containing the read data, + /// when successful. The returned list will contain up to `len` bytes; + /// it may return fewer than requested, but not more. The list is + /// empty when no bytes are available for reading at this time. The + /// pollable given by `subscribe` will be ready when more bytes are + /// available. + /// + /// This function fails with a `stream-error` when the operation + /// encounters an error, giving `last-operation-failed`, or when the + /// stream is closed, giving `closed`. + /// + /// When the caller gives a `len` of 0, it represents a request to + /// read 0 bytes. If the stream is still open, this call should + /// succeed and return an empty list, or otherwise fail with `closed`. + /// + /// The `len` parameter is a `u64`, which could represent a list of u8 which + /// is not possible to allocate in wasm32, or not desirable to allocate as + /// as a return value by the callee. The callee may return a list of bytes + /// less than `len` in size while more bytes are available for reading. + read: func( + /// The maximum number of bytes to read + len: u64 + ) -> result, stream-error>; + + /// Read bytes from a stream, after blocking until at least one byte can + /// be read. Except for blocking, behavior is identical to `read`. + blocking-read: func( + /// The maximum number of bytes to read + len: u64 + ) -> result, stream-error>; + + /// Skip bytes from a stream. Returns number of bytes skipped. + /// + /// Behaves identical to `read`, except instead of returning a list + /// of bytes, returns the number of bytes consumed from the stream. + skip: func( + /// The maximum number of bytes to skip. + len: u64, + ) -> result; + + /// Skip bytes from a stream, after blocking until at least one byte + /// can be skipped. Except for blocking behavior, identical to `skip`. + blocking-skip: func( + /// The maximum number of bytes to skip. + len: u64, + ) -> result; + + /// Create a `pollable` which will resolve once either the specified stream + /// has bytes available to read or the other end of the stream has been + /// closed. + /// The created `pollable` is a child resource of the `input-stream`. + /// Implementations may trap if the `input-stream` is dropped before + /// all derived `pollable`s created with this function are dropped. + subscribe: func() -> pollable; + } + + + /// An output bytestream. + /// + /// `output-stream`s are *non-blocking* to the extent practical on + /// underlying platforms. Except where specified otherwise, I/O operations also + /// always return promptly, after the number of bytes that can be written + /// promptly, which could even be zero. To wait for the stream to be ready to + /// accept data, the `subscribe` function to obtain a `pollable` which can be + /// polled for using `wasi:io/poll`. + resource output-stream { + /// Check readiness for writing. This function never blocks. + /// + /// Returns the number of bytes permitted for the next call to `write`, + /// or an error. Calling `write` with more bytes than this function has + /// permitted will trap. + /// + /// When this function returns 0 bytes, the `subscribe` pollable will + /// become ready when this function will report at least 1 byte, or an + /// error. + check-write: func() -> result; + + /// Perform a write. This function never blocks. + /// + /// When the destination of a `write` is binary data, the bytes from + /// `contents` are written verbatim. When the destination of a `write` is + /// known to the implementation to be text, the bytes of `contents` are + /// transcoded from UTF-8 into the encoding of the destination and then + /// written. + /// + /// Precondition: check-write gave permit of Ok(n) and contents has a + /// length of less than or equal to n. Otherwise, this function will trap. + /// + /// returns Err(closed) without writing if the stream has closed since + /// the last call to check-write provided a permit. + write: func( + contents: list + ) -> result<_, stream-error>; + + /// Perform a write of up to 4096 bytes, and then flush the stream. Block + /// until all of these operations are complete, or an error occurs. + /// + /// This is a convenience wrapper around the use of `check-write`, + /// `subscribe`, `write`, and `flush`, and is implemented with the + /// following pseudo-code: + /// + /// ```text + /// let pollable = this.subscribe(); + /// while !contents.is_empty() { + /// // Wait for the stream to become writable + /// pollable.block(); + /// let Ok(n) = this.check-write(); // eliding error handling + /// let len = min(n, contents.len()); + /// let (chunk, rest) = contents.split_at(len); + /// this.write(chunk ); // eliding error handling + /// contents = rest; + /// } + /// this.flush(); + /// // Wait for completion of `flush` + /// pollable.block(); + /// // Check for any errors that arose during `flush` + /// let _ = this.check-write(); // eliding error handling + /// ``` + blocking-write-and-flush: func( + contents: list + ) -> result<_, stream-error>; + + /// Request to flush buffered output. This function never blocks. + /// + /// This tells the output-stream that the caller intends any buffered + /// output to be flushed. the output which is expected to be flushed + /// is all that has been passed to `write` prior to this call. + /// + /// Upon calling this function, the `output-stream` will not accept any + /// writes (`check-write` will return `ok(0)`) until the flush has + /// completed. The `subscribe` pollable will become ready when the + /// flush has completed and the stream can accept more writes. + flush: func() -> result<_, stream-error>; + + /// Request to flush buffered output, and block until flush completes + /// and stream is ready for writing again. + blocking-flush: func() -> result<_, stream-error>; + + /// Create a `pollable` which will resolve once the output-stream + /// is ready for more writing, or an error has occured. When this + /// pollable is ready, `check-write` will return `ok(n)` with n>0, or an + /// error. + /// + /// If the stream is closed, this pollable is always ready immediately. + /// + /// The created `pollable` is a child resource of the `output-stream`. + /// Implementations may trap if the `output-stream` is dropped before + /// all derived `pollable`s created with this function are dropped. + subscribe: func() -> pollable; + + /// Write zeroes to a stream. + /// + /// This should be used precisely like `write` with the exact same + /// preconditions (must use check-write first), but instead of + /// passing a list of bytes, you simply pass the number of zero-bytes + /// that should be written. + write-zeroes: func( + /// The number of zero-bytes to write + len: u64 + ) -> result<_, stream-error>; + + /// Perform a write of up to 4096 zeroes, and then flush the stream. + /// Block until all of these operations are complete, or an error + /// occurs. + /// + /// This is a convenience wrapper around the use of `check-write`, + /// `subscribe`, `write-zeroes`, and `flush`, and is implemented with + /// the following pseudo-code: + /// + /// ```text + /// let pollable = this.subscribe(); + /// while num_zeroes != 0 { + /// // Wait for the stream to become writable + /// pollable.block(); + /// let Ok(n) = this.check-write(); // eliding error handling + /// let len = min(n, num_zeroes); + /// this.write-zeroes(len); // eliding error handling + /// num_zeroes -= len; + /// } + /// this.flush(); + /// // Wait for completion of `flush` + /// pollable.block(); + /// // Check for any errors that arose during `flush` + /// let _ = this.check-write(); // eliding error handling + /// ``` + blocking-write-zeroes-and-flush: func( + /// The number of zero-bytes to write + len: u64 + ) -> result<_, stream-error>; + + /// Read from one stream and write to another. + /// + /// The behavior of splice is equivelant to: + /// 1. calling `check-write` on the `output-stream` + /// 2. calling `read` on the `input-stream` with the smaller of the + /// `check-write` permitted length and the `len` provided to `splice` + /// 3. calling `write` on the `output-stream` with that read data. + /// + /// Any error reported by the call to `check-write`, `read`, or + /// `write` ends the splice and reports that error. + /// + /// This function returns the number of bytes transferred; it may be less + /// than `len`. + splice: func( + /// The stream to read from + src: borrow, + /// The number of bytes to splice + len: u64, + ) -> result; + + /// Read from one stream and write to another, with blocking. + /// + /// This is similar to `splice`, except that it blocks until the + /// `output-stream` is ready for writing, and the `input-stream` + /// is ready for reading, before performing the `splice`. + blocking-splice: func( + /// The stream to read from + src: borrow, + /// The number of bytes to splice + len: u64, + ) -> result; + } +} diff --git a/examples/rust/streams-io-component-server/wit/io/world.wit b/examples/rust/streams-io-component-server/wit/io/world.wit new file mode 100644 index 000000000..5f0b43fe5 --- /dev/null +++ b/examples/rust/streams-io-component-server/wit/io/world.wit @@ -0,0 +1,6 @@ +package wasi:io@0.2.0; + +world imports { + import streams; + import poll; +} diff --git a/examples/rust/streams-io-component-server/wit/rpc/rpc.wit b/examples/rust/streams-io-component-server/wit/rpc/rpc.wit new file mode 100644 index 000000000..dd3c21a91 --- /dev/null +++ b/examples/rust/streams-io-component-server/wit/rpc/rpc.wit @@ -0,0 +1,67 @@ +package wrpc:rpc@0.1.0; + +interface error { + use wasi:io/error@0.2.0.{error as io-error}; + + /// A resource which represents some error information. + /// + /// The only method provided by this resource is `to-debug-string`, + /// which provides some human-readable information about the error. + resource error { + /// Attempts to convert `wasi:io/error.error` into `error`. + /// + /// Returns the original `wasi:io/error.error` in case of mismatch. + from-io-error: static func(error: io-error) -> result; + + /// Returns a string that is suitable to assist humans in debugging + /// this error. + /// + /// WARNING: The returned string should not be consumed mechanically! + /// It may change across platforms, hosts, or other implementation + /// details. Parsing this string is a major platform-compatibility + /// hazard. + to-debug-string: func() -> string; + } +} + +interface context { + resource context { + default: static func() -> context; + } +} + +interface transport { + use wasi:io/poll@0.2.0.{pollable}; + use wasi:io/streams@0.2.0.{input-stream, output-stream}; + + use error.{error}; + + resource incoming-channel { + data: func() -> option; + index: func(path: list) -> result; + } + + resource outgoing-channel { + data: func() -> option; + index: func(path: list) -> result; + } + + resource invocation { + /// Returns a pollable, which will be ready once the invocation has been transmitted + subscribe: func() -> pollable; + /// Finish will consume this invocation returning an error, if any. + finish: static func(this: invocation) -> result, error>; + } +} + +interface invoker { + use context.{context}; + use transport.{invocation}; + + /// Asynchronously invoke a function. + invoke: func(cx: context, instance: string, name: string, params: list, paths: list>>) -> invocation; +} + +world imports { + import invoker; +} diff --git a/examples/rust/streams-io-component-server/wit/world.wit b/examples/rust/streams-io-component-server/wit/world.wit new file mode 100644 index 000000000..35b069b57 --- /dev/null +++ b/examples/rust/streams-io-component-server/wit/world.wit @@ -0,0 +1,12 @@ +package wrpc-examples:streams-io; + +interface handler { + use wasi:io/streams@0.2.0.{input-stream}; + + /// Read the entire input stream and return the total number of bytes read. + count: func(data: input-stream) -> u64; +} + +world server { + export handler; +} diff --git a/examples/rust/streams-io-tcp-client/Cargo.toml b/examples/rust/streams-io-tcp-client/Cargo.toml new file mode 100644 index 000000000..4de8e0797 --- /dev/null +++ b/examples/rust/streams-io-tcp-client/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "streams-io-tcp-client" +version = "0.1.0" + +authors.workspace = true +categories.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +anyhow = { workspace = true, features = ["std"] } +bytes = { workspace = true } +clap = { workspace = true, features = [ + "color", + "derive", + "error-context", + "help", + "std", + "suggestions", + "usage", +] } +futures = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread"] } +tokio-stream = { workspace = true, features = ["time"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = [ + "ansi", + "env-filter", + "fmt", +] } +wit-bindgen-wrpc = { workspace = true } +wrpc-transport = { workspace = true, features = ["net"] } diff --git a/examples/rust/streams-io-tcp-client/src/main.rs b/examples/rust/streams-io-tcp-client/src/main.rs new file mode 100644 index 000000000..bb585d4d4 --- /dev/null +++ b/examples/rust/streams-io-tcp-client/src/main.rs @@ -0,0 +1,61 @@ +use anyhow::Context as _; +use bytes::Bytes; +use clap::Parser; +use futures::stream; +use tokio::try_join; +use tracing::debug; + +mod bindings { + wit_bindgen_wrpc::generate!({ + with: { + "wrpc-examples:streams-io/handler": generate + } + }); +} + +use bindings::wrpc_examples::streams_io::handler::count; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Address to invoke `wrpc-examples:streams-io/handler.count` on + #[arg(default_value = "[::1]:7761")] + addr: String, + + /// Bytes to send through the stream + #[arg(default_value = "hello from a wRPC stream")] + payload: String, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt().init(); + + let Args { addr, payload } = Args::parse(); + + let wrpc = wrpc_transport::tcp::Client::from(&addr); + + // A `stream` is chunked using [`Bytes`]. The component server reads it + // as a `wasi:io/streams.input-stream` and counts the bytes. + let data = Box::pin(stream::iter([Bytes::from(payload.clone().into_bytes())])); + + let (total, io) = count(&wrpc, (), data) + .await + .context("failed to invoke `wrpc-examples:streams-io/handler.count`")?; + + try_join!( + async { + if let Some(io) = io { + debug!("performing async I/O"); + io.await.context("failed to complete async I/O") + } else { + Ok(()) + } + }, + async { + eprintln!("sent {} bytes, server counted {total}", payload.len()); + Ok(()) + } + )?; + Ok(()) +} diff --git a/examples/rust/streams-io-tcp-client/wit/world.wit b/examples/rust/streams-io-tcp-client/wit/world.wit new file mode 100644 index 000000000..526eef64f --- /dev/null +++ b/examples/rust/streams-io-tcp-client/wit/world.wit @@ -0,0 +1,12 @@ +package wrpc-examples:streams-io; + +interface handler { + /// Same function as the component server exports, but viewed over the wire: + /// the component's `wasi:io/streams.input-stream` parameter is bridged to a + /// native wRPC `stream` by `wrpc-runtime-wasmtime`. + count: func(data: stream) -> u64; +} + +world client { + import handler; +}