Skip to content

Commit 87cb14b

Browse files
committed
fix: add and use write_oneshot
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent 2d23d4b commit 87cb14b

File tree

2 files changed

+29
-19
lines changed

2 files changed

+29
-19
lines changed

crates/wasi/src/p3/sockets/host/types/tcp.rs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -192,18 +192,6 @@ impl<T> AccessorTask<T, WasiSockets, wasmtime::Result<()>> for ListenTask {
192192
}
193193
}
194194

195-
struct ResultWriteTask {
196-
result: Result<(), ErrorCode>,
197-
result_tx: FutureWriter<Result<(), ErrorCode>>,
198-
}
199-
200-
impl<T> AccessorTask<T, WasiSockets, wasmtime::Result<()>> for ResultWriteTask {
201-
async fn run(self, store: &Accessor<T, WasiSockets>) -> wasmtime::Result<()> {
202-
self.result_tx.write(store, self.result).await;
203-
Ok(())
204-
}
205-
}
206-
207195
struct ReceiveTask {
208196
stream: Arc<TcpStream>,
209197
data_tx: StreamWriter<Cursor<BytesMut>>,
@@ -254,13 +242,7 @@ impl<T> AccessorTask<T, WasiSockets, wasmtime::Result<()>> for ReceiveTask {
254242
.stream
255243
.as_socketlike_view::<std::net::TcpStream>()
256244
.shutdown(Shutdown::Read);
257-
258-
// Write the result async from a separate task to ensure that all resources used by this
259-
// task are freed
260-
store.spawn(ResultWriteTask {
261-
result: res,
262-
result_tx: self.result_tx,
263-
});
245+
self.result_tx.write_oneshot(store, res);
264246
Ok(())
265247
}
266248
}

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,34 @@ impl<T> FutureWriter<T> {
535535
}
536536
}
537537

538+
/// Write the specified value to this `future`.
539+
///
540+
/// Unlike [`write`](Self::write), this method ensures that the value has been written, but
541+
/// does not wait for the receiver to receive it.
542+
///
543+
/// # Panics
544+
///
545+
/// Panics if the store that the [`Accessor`] is derived from does not own
546+
/// this future.
547+
pub fn write_oneshot(mut self, accessor: impl AsAccessor, value: T)
548+
where
549+
T: Send + 'static,
550+
{
551+
// FIXME: this is intended to be used in the future to directly
552+
// manipulate state for this future within the store without having to
553+
// go through an mpsc.
554+
let _accessor = accessor.as_accessor();
555+
let (tx, _) = oneshot::channel();
556+
send(
557+
&mut self.tx.as_mut().unwrap(),
558+
WriteEvent::Write {
559+
buffer: Some(value),
560+
tx,
561+
},
562+
);
563+
self.default = None;
564+
}
565+
538566
/// Wait for the read end of this `future` is dropped.
539567
///
540568
/// The [`Accessor`] provided can be acquired from [`Instance::run_concurrent`] or

0 commit comments

Comments
 (0)