Skip to content

Commit 92f7974

Browse files
authored
wasip3: Refine 0-length behavior in cli/sockets (#11614)
Add some comments for issues and `if` blocks in a few places. I'd like to add some tests but I ran into #11611 so I'll defer tests to later.
1 parent db7b44d commit 92f7974

File tree

3 files changed

+88
-59
lines changed

3 files changed

+88
-59
lines changed

crates/wasi/src/p3/cli/host.rs

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,27 +33,34 @@ impl<D> StreamProducer<D> for InputStreamProducer {
3333
dst: Destination<'a, Self::Item, Self::Buffer>,
3434
finish: bool,
3535
) -> Poll<wasmtime::Result<StreamResult>> {
36-
if let Some(0) = dst.remaining(store.as_context_mut()) {
37-
Poll::Ready(Ok(StreamResult::Completed))
38-
} else {
39-
let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
40-
let mut buf = ReadBuf::new(dst.remaining());
41-
match self.rx.as_mut().poll_read(cx, &mut buf) {
42-
Poll::Ready(Ok(())) if buf.filled().is_empty() => {
43-
Poll::Ready(Ok(StreamResult::Dropped))
44-
}
45-
Poll::Ready(Ok(())) => {
46-
let n = buf.filled().len();
47-
dst.mark_written(n);
48-
Poll::Ready(Ok(StreamResult::Completed))
49-
}
50-
Poll::Ready(Err(..)) => {
51-
// TODO: Report the error to the guest
52-
Poll::Ready(Ok(StreamResult::Dropped))
53-
}
54-
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
55-
Poll::Pending => Poll::Pending,
36+
// If the destination buffer is empty then this is a request on
37+
// behalf of the guest to wait for this input stream to be readable.
38+
// The `AsyncRead` trait abstraction does not provide the ability to
39+
// await this event so we're forced to basically just lie here and
40+
// say we're ready read data later.
41+
//
42+
// See WebAssembly/component-model#561 for some more information.
43+
if dst.remaining(store.as_context_mut()) == Some(0) {
44+
return Poll::Ready(Ok(StreamResult::Completed));
45+
}
46+
47+
let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
48+
let mut buf = ReadBuf::new(dst.remaining());
49+
match self.rx.as_mut().poll_read(cx, &mut buf) {
50+
Poll::Ready(Ok(())) if buf.filled().is_empty() => {
51+
Poll::Ready(Ok(StreamResult::Dropped))
52+
}
53+
Poll::Ready(Ok(())) => {
54+
let n = buf.filled().len();
55+
dst.mark_written(n);
56+
Poll::Ready(Ok(StreamResult::Completed))
57+
}
58+
Poll::Ready(Err(..)) => {
59+
// TODO: Report the error to the guest
60+
Poll::Ready(Ok(StreamResult::Dropped))
5661
}
62+
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
63+
Poll::Pending => Poll::Pending,
5764
}
5865
}
5966
}
@@ -74,17 +81,26 @@ impl<D> StreamConsumer<D> for OutputStreamConsumer {
7481
) -> Poll<wasmtime::Result<StreamResult>> {
7582
let mut src = src.as_direct(store);
7683
let buf = src.remaining();
84+
85+
// If the source buffer is empty then this is a request on behalf of
86+
// the guest to wait for this output stream to be writable. The
87+
// `AsyncWrite` trait abstraction does not provide the ability to await
88+
// this event so we're forced to basically just lie here and say we're
89+
// ready write data later.
90+
//
91+
// See WebAssembly/component-model#561 for some more information.
92+
if buf.len() == 0 {
93+
return Poll::Ready(Ok(StreamResult::Completed));
94+
}
7795
match self.tx.as_mut().poll_write(cx, buf) {
78-
Poll::Ready(Ok(n)) if buf.is_empty() => {
79-
debug_assert_eq!(n, 0);
80-
Poll::Ready(Ok(StreamResult::Completed))
81-
}
8296
Poll::Ready(Ok(n)) => {
8397
src.mark_read(n);
8498
Poll::Ready(Ok(StreamResult::Completed))
8599
}
86-
Poll::Ready(Err(..)) => {
87-
// TODO: Report the error to the guest
100+
Poll::Ready(Err(e)) => {
101+
// FIXME(WebAssembly/wasi-cli#81) should communicate this
102+
// error to the guest somehow.
103+
tracing::warn!("dropping stdin error: {e}");
88104
Poll::Ready(Ok(StreamResult::Dropped))
89105
}
90106
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),

crates/wasi/src/p3/sockets/host/types/tcp.rs

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,16 @@ where
6565
mut dst: Destination<'a, Self::Item, Self::Buffer>,
6666
finish: bool,
6767
) -> Poll<wasmtime::Result<StreamResult>> {
68+
// If the destination buffer is empty then this is a request on
69+
// behalf of the guest to wait for this socket to be ready to accept
70+
// without actually accepting something. The `TcpListener` in Tokio does
71+
// not have this capability so we're forced to lie here and say instead
72+
// "yes we're ready to accept" as a fallback.
73+
//
74+
// See WebAssembly/component-model#561 for some more information.
75+
if dst.remaining(&mut store) == Some(0) {
76+
return Poll::Ready(Ok(StreamResult::Completed));
77+
}
6878
let res = match self.listener.poll_accept(cx) {
6979
Poll::Ready(res) => res.map(|(stream, _)| stream),
7080
Poll::Pending if finish => return Poll::Ready(Ok(StreamResult::Cancelled)),
@@ -116,37 +126,37 @@ impl<D> StreamProducer<D> for ReceiveStreamProducer {
116126
finish: bool,
117127
) -> Poll<wasmtime::Result<StreamResult>> {
118128
let res = 'result: {
119-
if let Some(0) = dst.remaining(store.as_context_mut()) {
120-
match self.stream.poll_read_ready(cx) {
121-
Poll::Ready(Ok(())) => return Poll::Ready(Ok(StreamResult::Completed)),
129+
// 0-length reads are an indication that we should wait for
130+
// readiness here, so use `poll_read_ready`.
131+
if dst.remaining(store.as_context_mut()) == Some(0) {
132+
return match self.stream.poll_read_ready(cx) {
133+
Poll::Ready(Ok(())) => Poll::Ready(Ok(StreamResult::Completed)),
122134
Poll::Ready(Err(err)) => break 'result Err(err.into()),
123-
Poll::Pending if finish => {
124-
return Poll::Ready(Ok(StreamResult::Cancelled));
135+
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
136+
Poll::Pending => Poll::Pending,
137+
};
138+
}
139+
140+
let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
141+
let buf = dst.remaining();
142+
loop {
143+
match self.stream.try_read(buf) {
144+
Ok(0) => break 'result Ok(()),
145+
Ok(n) => {
146+
dst.mark_written(n);
147+
return Poll::Ready(Ok(StreamResult::Completed));
125148
}
126-
Poll::Pending => return Poll::Pending,
127-
}
128-
} else {
129-
let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
130-
let buf = dst.remaining();
131-
loop {
132-
match self.stream.try_read(buf) {
133-
Ok(0) => break 'result Ok(()),
134-
Ok(n) => {
135-
dst.mark_written(n);
136-
return Poll::Ready(Ok(StreamResult::Completed));
137-
}
138-
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
139-
match self.stream.poll_read_ready(cx) {
140-
Poll::Ready(Ok(())) => continue,
141-
Poll::Ready(Err(err)) => break 'result Err(err.into()),
142-
Poll::Pending if finish => {
143-
return Poll::Ready(Ok(StreamResult::Cancelled));
144-
}
145-
Poll::Pending => return Poll::Pending,
149+
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
150+
match self.stream.poll_read_ready(cx) {
151+
Poll::Ready(Ok(())) => continue,
152+
Poll::Ready(Err(err)) => break 'result Err(err.into()),
153+
Poll::Pending if finish => {
154+
return Poll::Ready(Ok(StreamResult::Cancelled));
146155
}
156+
Poll::Pending => return Poll::Pending,
147157
}
148-
Err(err) => break 'result Err(err.into()),
149158
}
159+
Err(err) => break 'result Err(err.into()),
150160
}
151161
}
152162
};
@@ -190,13 +200,15 @@ impl<D> StreamConsumer<D> for SendStreamConsumer {
190200
) -> Poll<wasmtime::Result<StreamResult>> {
191201
let mut src = src.as_direct(store);
192202
let res = 'result: {
203+
// A 0-length write is a request to wait for readiness so use
204+
// `poll_write_ready` to wait for the underlying object to be ready.
193205
if src.remaining().is_empty() {
194-
match self.stream.poll_write_ready(cx) {
195-
Poll::Ready(Ok(())) => return Poll::Ready(Ok(StreamResult::Completed)),
206+
return match self.stream.poll_write_ready(cx) {
207+
Poll::Ready(Ok(())) => Poll::Ready(Ok(StreamResult::Completed)),
196208
Poll::Ready(Err(err)) => break 'result Err(err.into()),
197-
Poll::Pending if finish => return Poll::Ready(Ok(StreamResult::Cancelled)),
198-
Poll::Pending => return Poll::Pending,
199-
}
209+
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
210+
Poll::Pending => Poll::Pending,
211+
};
200212
}
201213
loop {
202214
match self.stream.try_write(src.remaining()) {

crates/wasi/tests/all/store.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ impl<T> Ctx<T> {
2424
name: &str,
2525
configure: impl FnOnce(&mut WasiCtxBuilder) -> T,
2626
) -> Result<(Store<Ctx<T>>, TempDir)> {
27-
let stdout = MemoryOutputPipe::new(4096);
28-
let stderr = MemoryOutputPipe::new(4096);
27+
const MAX_OUTPUT_SIZE: usize = 10 << 20;
28+
let stdout = MemoryOutputPipe::new(MAX_OUTPUT_SIZE);
29+
let stderr = MemoryOutputPipe::new(MAX_OUTPUT_SIZE);
2930
let workspace = prepare_workspace(name)?;
3031

3132
// Create our wasi context.

0 commit comments

Comments
 (0)