Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit 304a7f1

Browse files
committed
avoid allocating oneshot channel in watch_{reader,writer}
Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent a265abf commit 304a7f1

File tree

1 file changed

+58
-50
lines changed

1 file changed

+58
-50
lines changed

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 58 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ use buffers::UntypedWriteBuffer;
1919
use futures::channel::oneshot;
2020
use std::boxed::Box;
2121
use std::fmt;
22+
use std::future;
2223
use std::iter;
2324
use std::marker::PhantomData;
2425
use std::mem::{self, MaybeUninit};
2526
use std::ops::{Deref, DerefMut};
2627
use std::ptr::NonNull;
2728
use std::string::{String, ToString};
2829
use std::sync::Arc;
30+
use std::task::{Poll, Waker};
2931
use std::vec::Vec;
3032
use wasmtime_environ::component::{
3133
CanonicalAbiInfo, ComponentTypes, InterfaceType, RuntimeComponentInstanceIndex, StringEncoding,
@@ -312,56 +314,58 @@ fn accept_writer<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
312314
/// Return a `Future` which will resolve once the reader end corresponding to
313315
/// the specified writer end of a future or stream is dropped.
314316
async fn watch_reader(accessor: impl AsAccessor, instance: Instance, id: TableId<TransmitHandle>) {
315-
let result = accessor.as_accessor().with(|mut access| {
316-
let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0);
317-
let state_id = concurrent_state.get(id)?.state;
318-
let state = concurrent_state.get_mut(state_id)?;
319-
anyhow::Ok(if matches!(&state.read, ReadState::Dropped) {
320-
None
321-
} else {
322-
let (tx, rx) = oneshot::channel();
323-
state.reader_watcher = Some(tx);
324-
Some(rx)
325-
})
326-
});
327-
328-
if let Ok(Some(rx)) = result {
329-
_ = rx.await
330-
}
317+
future::poll_fn(|cx| {
318+
accessor
319+
.as_accessor()
320+
.with(|mut access| {
321+
let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0);
322+
let state_id = concurrent_state.get(id)?.state;
323+
let state = concurrent_state.get_mut(state_id)?;
324+
anyhow::Ok(if matches!(&state.read, ReadState::Dropped) {
325+
Poll::Ready(())
326+
} else {
327+
state.reader_watcher = Some(cx.waker().clone());
328+
Poll::Pending
329+
})
330+
})
331+
.unwrap_or(Poll::Ready(()))
332+
})
333+
.await
331334
}
332335

333336
/// Return a `Future` which will resolve once the writer end corresponding to
334337
/// the specified reader end of a future or stream is dropped.
335338
async fn watch_writer(accessor: impl AsAccessor, instance: Instance, id: TableId<TransmitHandle>) {
336-
let result = accessor.as_accessor().with(|mut access| {
337-
let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0);
338-
let state_id = concurrent_state.get(id)?.state;
339-
let state = concurrent_state.get_mut(state_id)?;
340-
anyhow::Ok(
341-
if matches!(
342-
&state.write,
343-
WriteState::Dropped
344-
| WriteState::GuestReady {
345-
post_write: PostWrite::Drop,
346-
..
347-
}
348-
| WriteState::HostReady {
349-
post_write: PostWrite::Drop,
350-
..
351-
}
352-
) {
353-
None
354-
} else {
355-
let (tx, rx) = oneshot::channel();
356-
state.writer_watcher = Some(tx);
357-
Some(rx)
358-
},
359-
)
360-
});
361-
362-
if let Ok(Some(rx)) = result {
363-
_ = rx.await
364-
}
339+
future::poll_fn(|cx| {
340+
accessor
341+
.as_accessor()
342+
.with(|mut access| {
343+
let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0);
344+
let state_id = concurrent_state.get(id)?.state;
345+
let state = concurrent_state.get_mut(state_id)?;
346+
anyhow::Ok(
347+
if matches!(
348+
&state.write,
349+
WriteState::Dropped
350+
| WriteState::GuestReady {
351+
post_write: PostWrite::Drop,
352+
..
353+
}
354+
| WriteState::HostReady {
355+
post_write: PostWrite::Drop,
356+
..
357+
}
358+
) {
359+
Poll::Ready(())
360+
} else {
361+
state.writer_watcher = Some(cx.waker().clone());
362+
Poll::Pending
363+
},
364+
)
365+
})
366+
.unwrap_or(Poll::Ready(()))
367+
})
368+
.await
365369
}
366370

367371
/// Represents the state of a stream or future handle from the perspective of a
@@ -1405,14 +1409,14 @@ struct TransmitState {
14051409
write: WriteState,
14061410
/// See `ReadState`
14071411
read: ReadState,
1408-
/// The `Sender`, if any, to be dropped when the write end of the stream or
1412+
/// The `Waker`, if any, to be woken when the write end of the stream or
14091413
/// future is dropped.
14101414
///
14111415
/// This will signal to the host-owned read end that the write end has been
14121416
/// dropped.
1413-
writer_watcher: Option<oneshot::Sender<()>>,
1417+
writer_watcher: Option<Waker>,
14141418
/// Like `writer_watcher`, but for the reverse direction.
1415-
reader_watcher: Option<oneshot::Sender<()>>,
1419+
reader_watcher: Option<Waker>,
14161420
/// Whether futher values may be transmitted via this stream or future.
14171421
done: bool,
14181422
}
@@ -1866,7 +1870,9 @@ impl Instance {
18661870
);
18671871

18681872
transmit.read = ReadState::Dropped;
1869-
transmit.reader_watcher = None;
1873+
if let Some(waker) = transmit.reader_watcher.take() {
1874+
waker.wake();
1875+
}
18701876

18711877
// If the write end is already dropped, it should stay dropped,
18721878
// otherwise, it should be opened.
@@ -1952,7 +1958,9 @@ impl Instance {
19521958
transmit.write
19531959
);
19541960

1955-
transmit.writer_watcher = None;
1961+
if let Some(waker) = transmit.writer_watcher.take() {
1962+
waker.wake();
1963+
}
19561964

19571965
// Existing queued transmits must be updated with information for the impending writer closure
19581966
match &mut transmit.write {

0 commit comments

Comments
 (0)