Skip to content

Commit b69e53c

Browse files
committed
fix: add and use write_oneshot
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent 2d23d4b commit b69e53c

File tree

2 files changed

+31
-19
lines changed

2 files changed

+31
-19
lines changed

crates/wasi/src/p3/sockets/host/types/tcp.rs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -192,18 +192,6 @@ impl<T> AccessorTask<T, WasiSockets, wasmtime::Result<()>> for ListenTask {
192192
}
193193
}
194194

195-
struct ResultWriteTask {
196-
result: Result<(), ErrorCode>,
197-
result_tx: FutureWriter<Result<(), ErrorCode>>,
198-
}
199-
200-
impl<T> AccessorTask<T, WasiSockets, wasmtime::Result<()>> for ResultWriteTask {
201-
async fn run(self, store: &Accessor<T, WasiSockets>) -> wasmtime::Result<()> {
202-
self.result_tx.write(store, self.result).await;
203-
Ok(())
204-
}
205-
}
206-
207195
struct ReceiveTask {
208196
stream: Arc<TcpStream>,
209197
data_tx: StreamWriter<Cursor<BytesMut>>,
@@ -250,17 +238,11 @@ impl<T> AccessorTask<T, WasiSockets, wasmtime::Result<()>> for ReceiveTask {
250238
}
251239
}
252240
};
241+
self.result_tx.write_oneshot(store, res);
253242
_ = self
254243
.stream
255244
.as_socketlike_view::<std::net::TcpStream>()
256245
.shutdown(Shutdown::Read);
257-
258-
// Write the result async from a separate task to ensure that all resources used by this
259-
// task are freed
260-
store.spawn(ResultWriteTask {
261-
result: res,
262-
result_tx: self.result_tx,
263-
});
264246
Ok(())
265247
}
266248
}

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,36 @@ impl<T> FutureWriter<T> {
535535
}
536536
}
537537

538+
/// Write the specified value to this `future`.
539+
///
540+
/// Unlike [`write`](Self::write), this method ensures that the value has been written, but
541+
/// does not wait for the receiver to receive it.
542+
///
543+
/// # Panics
544+
///
545+
/// Panics if the store that the [`Accessor`] is derived from does not own
546+
/// this future.
547+
// TODO: Rework/refactor this as part of https://github.com/bytecodealliance/wasmtime/pull/11325
548+
#[doc(hidden)]
549+
pub fn write_oneshot(mut self, accessor: impl AsAccessor, value: T)
550+
where
551+
T: Send + 'static,
552+
{
553+
// FIXME: this is intended to be used in the future to directly
554+
// manipulate state for this future within the store without having to
555+
// go through an mpsc.
556+
let _accessor = accessor.as_accessor();
557+
let (tx, _) = oneshot::channel();
558+
send(
559+
&mut self.tx.as_mut().unwrap(),
560+
WriteEvent::Write {
561+
buffer: Some(value),
562+
tx,
563+
},
564+
);
565+
self.default = None;
566+
}
567+
538568
/// Wait for the read end of this `future` is dropped.
539569
///
540570
/// The [`Accessor`] provided can be acquired from [`Instance::run_concurrent`] or

0 commit comments

Comments
 (0)