Skip to content

Commit 625a3e3

Browse files
authored
async: add stream/future producers and blanket impl for Future (#11684)
* async: add `EmptyProducer` and blanket impl for `Future` Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * move more future/stream producers to `wasmtime` crate Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * async: allow blanket `Future` impl to trap Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * async: remove the need for `{Ready,Empty}Producer` Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> --------- Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent 5245e1f commit 625a3e3

5 files changed

Lines changed: 274 additions & 131 deletions

File tree

crates/wasi-http/src/p3/body.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::p3::bindings::http::types::{ErrorCode, Fields, Trailers};
22
use crate::p3::{WasiHttp, WasiHttpCtxView};
33
use anyhow::Context as _;
44
use bytes::Bytes;
5+
use core::iter;
56
use core::num::NonZeroUsize;
67
use core::pin::Pin;
78
use core::task::{Context, Poll, ready};
@@ -17,7 +18,6 @@ use wasmtime::component::{
1718
StreamProducer, StreamReader, StreamResult,
1819
};
1920
use wasmtime::{AsContextMut, StoreContextMut};
20-
use wasmtime_wasi::p3::{FutureOneshotProducer, StreamEmptyProducer};
2121

2222
/// The concrete type behind a `wasi:http/types/body` resource.
2323
pub(crate) enum Body {
@@ -75,7 +75,7 @@ impl Body {
7575
// https://github.com/WebAssembly/wasi-http/issues/176
7676
_ = result_tx.send(Box::new(async { Ok(()) }));
7777
Ok((
78-
StreamReader::new(instance, &mut store, StreamEmptyProducer::default()),
78+
StreamReader::new(instance, &mut store, iter::empty()),
7979
trailers_rx,
8080
))
8181
}
@@ -95,11 +95,7 @@ impl Body {
9595
getter,
9696
},
9797
),
98-
FutureReader::new(
99-
instance,
100-
&mut store,
101-
FutureOneshotProducer::from(trailers_rx),
102-
),
98+
FutureReader::new(instance, &mut store, trailers_rx),
10399
))
104100
}
105101
Body::Consumed => Err(()),

crates/wasi/src/p3/filesystem/host.rs

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,13 @@ use crate::p3::bindings::filesystem::types::{
55
Filesize, MetadataHashValue, NewTimestamp, OpenFlags, PathFlags,
66
};
77
use crate::p3::filesystem::{FilesystemError, FilesystemResult, preopens};
8-
use crate::p3::{
9-
DEFAULT_BUFFER_CAPACITY, FallibleIteratorProducer, FutureOneshotProducer, FutureReadyProducer,
10-
StreamEmptyProducer,
11-
};
8+
use crate::p3::{DEFAULT_BUFFER_CAPACITY, FallibleIteratorProducer};
129
use crate::{DirPerms, FilePerms};
1310
use anyhow::Context as _;
1411
use bytes::BytesMut;
15-
use core::mem;
1612
use core::pin::Pin;
1713
use core::task::{Context, Poll, ready};
14+
use core::{iter, mem};
1815
use std::io::{self, Cursor};
1916
use std::sync::Arc;
2017
use system_interface::fs::FileIoExt as _;
@@ -498,12 +495,10 @@ impl types::HostDescriptorWithStore for WasiFilesystem {
498495
let file = get_file(store.get().table, &fd)?;
499496
if !file.perms.contains(FilePerms::READ) {
500497
return Ok((
501-
StreamReader::new(instance, &mut store, StreamEmptyProducer::default()),
502-
FutureReader::new(
503-
instance,
504-
&mut store,
505-
FutureReadyProducer(Some(Err(ErrorCode::NotPermitted))),
506-
),
498+
StreamReader::new(instance, &mut store, iter::empty()),
499+
FutureReader::new(instance, &mut store, async {
500+
anyhow::Ok(Err(ErrorCode::NotPermitted))
501+
}),
507502
));
508503
}
509504

@@ -520,7 +515,7 @@ impl types::HostDescriptorWithStore for WasiFilesystem {
520515
task: None,
521516
},
522517
),
523-
FutureReader::new(instance, &mut store, FutureOneshotProducer(result_rx)),
518+
FutureReader::new(instance, &mut store, result_rx),
524519
))
525520
})
526521
}
@@ -644,12 +639,10 @@ impl types::HostDescriptorWithStore for WasiFilesystem {
644639
let dir = get_dir(store.get().table, &fd)?;
645640
if !dir.perms.contains(DirPerms::READ) {
646641
return Ok((
647-
StreamReader::new(instance, &mut store, StreamEmptyProducer::default()),
648-
FutureReader::new(
649-
instance,
650-
&mut store,
651-
FutureReadyProducer(Some(Err(ErrorCode::NotPermitted))),
652-
),
642+
StreamReader::new(instance, &mut store, iter::empty()),
643+
FutureReader::new(instance, &mut store, async {
644+
anyhow::Ok(Err(ErrorCode::NotPermitted))
645+
}),
653646
));
654647
}
655648
let allow_blocking_current_thread = dir.allow_blocking_current_thread;
@@ -667,16 +660,13 @@ impl types::HostDescriptorWithStore for WasiFilesystem {
667660
),
668661
Err(e) => {
669662
result_tx.send(Err(e.into())).unwrap();
670-
StreamReader::new(instance, &mut store, StreamEmptyProducer::default())
663+
StreamReader::new(instance, &mut store, iter::empty())
671664
}
672665
}
673666
} else {
674667
StreamReader::new(instance, &mut store, ReadDirStream::new(dir, result_tx))
675668
};
676-
Ok((
677-
stream,
678-
FutureReader::new(instance, &mut store, FutureOneshotProducer(result_rx)),
679-
))
669+
Ok((stream, FutureReader::new(instance, &mut store, result_rx)))
680670
})
681671
}
682672

crates/wasi/src/p3/mod.rs

Lines changed: 1 addition & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -17,94 +17,15 @@ pub mod sockets;
1717

1818
use crate::WasiView;
1919
use crate::p3::bindings::LinkOptions;
20-
use anyhow::Context as _;
21-
use core::marker::PhantomData;
2220
use core::pin::Pin;
2321
use core::task::{Context, Poll};
2422
use tokio::sync::oneshot;
2523
use wasmtime::StoreContextMut;
26-
use wasmtime::component::{
27-
Destination, FutureProducer, Linker, StreamProducer, StreamResult, VecBuffer,
28-
};
24+
use wasmtime::component::{Destination, Linker, StreamProducer, StreamResult, VecBuffer};
2925

3026
// Default buffer capacity to use for reads of byte-sized values.
3127
const DEFAULT_BUFFER_CAPACITY: usize = 8192;
3228

33-
pub struct StreamEmptyProducer<T>(PhantomData<fn(T) -> T>);
34-
35-
impl<T> Default for StreamEmptyProducer<T> {
36-
fn default() -> Self {
37-
Self(PhantomData)
38-
}
39-
}
40-
41-
impl<T: Send + Sync + 'static, D> StreamProducer<D> for StreamEmptyProducer<T> {
42-
type Item = T;
43-
type Buffer = Option<Self::Item>;
44-
45-
fn poll_produce<'a>(
46-
self: Pin<&mut Self>,
47-
_: &mut Context<'_>,
48-
_: StoreContextMut<'a, D>,
49-
_: Destination<'a, Self::Item, Self::Buffer>,
50-
_: bool,
51-
) -> Poll<wasmtime::Result<StreamResult>> {
52-
Poll::Ready(Ok(StreamResult::Dropped))
53-
}
54-
}
55-
56-
struct FutureReadyProducer<T>(Option<T>);
57-
58-
impl<T, D> FutureProducer<D> for FutureReadyProducer<T>
59-
where
60-
T: Send + Unpin + 'static,
61-
{
62-
type Item = T;
63-
64-
fn poll_produce(
65-
self: Pin<&mut Self>,
66-
_: &mut Context<'_>,
67-
_: StoreContextMut<D>,
68-
_: bool,
69-
) -> Poll<wasmtime::Result<Option<T>>> {
70-
let v = self
71-
.get_mut()
72-
.0
73-
.take()
74-
.context("polled after returning `Ready`")?;
75-
Poll::Ready(Ok(Some(v)))
76-
}
77-
}
78-
79-
pub struct FutureOneshotProducer<T>(oneshot::Receiver<T>);
80-
81-
impl<T> From<oneshot::Receiver<T>> for FutureOneshotProducer<T> {
82-
fn from(rx: oneshot::Receiver<T>) -> Self {
83-
Self(rx)
84-
}
85-
}
86-
87-
impl<T, D> FutureProducer<D> for FutureOneshotProducer<T>
88-
where
89-
T: Send + 'static,
90-
{
91-
type Item = T;
92-
93-
fn poll_produce(
94-
self: Pin<&mut Self>,
95-
cx: &mut Context<'_>,
96-
_: StoreContextMut<D>,
97-
finish: bool,
98-
) -> Poll<wasmtime::Result<Option<T>>> {
99-
match Pin::new(&mut self.get_mut().0).poll(cx) {
100-
Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
101-
Poll::Ready(Err(err)) => Poll::Ready(Err(err).context("oneshot sender dropped")),
102-
Poll::Pending if finish => Poll::Ready(Ok(None)),
103-
Poll::Pending => Poll::Pending,
104-
}
105-
}
106-
}
107-
10829
/// Helper structure to convert an iterator of `Result<T, E>` into a `stream<T>`
10930
/// plus a `future<result<_, T>>` in WIT.
11031
///

crates/wasi/src/p3/sockets/host/types/tcp.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
use super::is_addr_allowed;
2+
use crate::p3::DEFAULT_BUFFER_CAPACITY;
23
use crate::p3::bindings::sockets::types::{
34
Duration, ErrorCode, HostTcpSocket, HostTcpSocketWithStore, IpAddressFamily, IpSocketAddress,
45
TcpSocket,
56
};
67
use crate::p3::sockets::{SocketError, SocketResult, WasiSockets};
7-
use crate::p3::{
8-
DEFAULT_BUFFER_CAPACITY, FutureOneshotProducer, FutureReadyProducer, StreamEmptyProducer,
9-
};
108
use crate::sockets::{NonInheritedOptions, SocketAddrUse, SocketAddressFamily, WasiSocketsCtxView};
119
use anyhow::Context as _;
1210
use bytes::BytesMut;
11+
use core::iter;
1312
use core::pin::Pin;
1413
use core::task::{Context, Poll};
1514
use io_lifetimes::AsSocketlike as _;
@@ -351,16 +350,14 @@ impl HostTcpSocketWithStore for WasiSockets {
351350
result: Some(result_tx),
352351
},
353352
),
354-
FutureReader::new(instance, &mut store, FutureOneshotProducer(result_rx)),
353+
FutureReader::new(instance, &mut store, result_rx),
355354
))
356355
}
357356
None => Ok((
358-
StreamReader::new(instance, &mut store, StreamEmptyProducer::default()),
359-
FutureReader::new(
360-
instance,
361-
&mut store,
362-
FutureReadyProducer(Some(Err(ErrorCode::InvalidState))),
363-
),
357+
StreamReader::new(instance, &mut store, iter::empty()),
358+
FutureReader::new(instance, &mut store, async {
359+
anyhow::Ok(Err(ErrorCode::InvalidState))
360+
}),
364361
)),
365362
}
366363
})

0 commit comments

Comments
 (0)