Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit 24193dd

Browse files
committed
address unsafe buffer management in futures_and_streams
This consolidates some unsafe code to avoid the need for non-local reasoning about invariants. Specifically, we can now efficiently transfer ownership of buffered items for host-based streams and futures without requiring unsafe blocks. Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent b20be42 commit 24193dd

2 files changed

Lines changed: 105 additions & 110 deletions

File tree

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 22 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use {
4242
},
4343
};
4444

45-
pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
45+
pub use buffers::{ReadBuffer, TakeBuffer, VecBuffer, WriteBuffer};
4646

4747
mod buffers;
4848

@@ -144,11 +144,7 @@ fn waitable_state(ty: TableIndex, state: StreamFutureState) -> WaitableState {
144144

145145
/// Return a closure which matches a host write operation to a read (or close)
146146
/// operation.
147-
///
148-
/// SAFETY: The `ComponentInstance` passed to the returned closure must, when
149-
/// paired with a `Reader::Guest { .. }`, match the one the stream or future
150-
/// belongs to.
151-
unsafe fn accept_reader<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
147+
fn accept_reader<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
152148
store: StoreContextMut<U>,
153149
mut buffer: B,
154150
tx: oneshot::Sender<HostResult<B>>,
@@ -203,11 +199,8 @@ unsafe fn accept_reader<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: '
203199
ReturnCode::Completed(count.try_into().unwrap())
204200
}
205201
Reader::Host { accept } => {
206-
// SAFETY: Per the requirements described in `host_read`'s
207-
// documentation, we give up ownership of the items by calling
208-
// `buffer.forget` immediately after the following call.
209-
let count = accept(buffer.remaining().as_ptr().cast(), buffer.remaining().len());
210-
buffer.forget(count);
202+
let count = buffer.remaining().len();
203+
let count = accept(&mut buffer, count);
211204
_ = tx.send(HostResult {
212205
buffer,
213206
closed: false,
@@ -229,17 +222,7 @@ unsafe fn accept_reader<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: '
229222

230223
/// Return a closure which matches a host read operation to a write (or close)
231224
/// operation.
232-
///
233-
/// SAFETY: If and when a `Writer::Host { .. }` is passed to the returned
234-
/// closure, the `pointer` field must be a valid `*mut T` array of `count`
235-
/// elements, and those elements must be forgotten using e.g. `mem::forget`
236-
/// after the call to the closure returns since they will have been moved into
237-
/// another buffer (i.e. the destination buffer will take ownership of them).
238-
// TODO: This should return a `impl UnsafeFnOnce` (where `UnsafeFnOnce` would
239-
// need to be a trait we define ourselves, since there's no standard equivalent)
240-
// rather than a `impl FnOnce`. That would force the caller to use an unsafe
241-
// block and (hopefully) uphold the contract we've described above.
242-
unsafe fn accept_writer<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
225+
fn accept_writer<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
243226
mut buffer: B,
244227
tx: oneshot::Sender<HostResult<B>>,
245228
) -> impl FnOnce(Writer) -> Result<ReturnCode> + Send + Sync + 'static {
@@ -279,12 +262,12 @@ unsafe fn accept_writer<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
279262
});
280263
ReturnCode::Completed(count.try_into().unwrap())
281264
}
282-
Writer::Host { pointer, count } => {
265+
Writer::Host {
266+
buffer: input,
267+
count,
268+
} => {
283269
let count = count.min(buffer.remaining_capacity());
284-
// SAFETY: Per the contact of `accept_writer`, `pointer` is a
285-
// valid `*mut T` array of `count` elements which we may move
286-
// (i.e. transfer ownership) into the destination buffer.
287-
unsafe { buffer.copy_from(pointer.cast(), count) };
270+
buffer.move_from(input, count);
288271
_ = tx.send(HostResult {
289272
buffer,
290273
closed: false,
@@ -1423,7 +1406,7 @@ enum Writer<'a> {
14231406
count: usize,
14241407
},
14251408
Host {
1426-
pointer: *const u8,
1409+
buffer: &'a mut dyn TakeBuffer,
14271410
count: usize,
14281411
},
14291412
End,
@@ -1441,7 +1424,7 @@ enum Reader<'a> {
14411424
count: usize,
14421425
},
14431426
Host {
1444-
accept: Box<dyn FnOnce(*const u8, usize) -> usize>,
1427+
accept: Box<dyn FnOnce(&mut dyn TakeBuffer, usize) -> usize>,
14451428
},
14461429
End,
14471430
}
@@ -1642,11 +1625,7 @@ impl ComponentInstance {
16421625
let rep = my_rep.unwrap();
16431626
match event {
16441627
ReadEvent::Read { buffer, tx } => {
1645-
// SAFETY: See the `Reader::Host` case of the
1646-
// closure returned by `accept_reader` for where we
1647-
// satisfy the requirements documented for
1648-
// `host_read`.
1649-
super::with_local_instance(|store, instance| unsafe {
1628+
super::with_local_instance(|store, instance| {
16501629
instance.host_read::<_, _, U>(
16511630
token.as_context_mut(store),
16521631
rep,
@@ -1774,11 +1753,7 @@ impl ComponentInstance {
17741753
assert!(matches!(&transmit.write, WriteState::Open));
17751754

17761755
transmit.write = WriteState::HostReady {
1777-
// SAFETY: The closure we store here will only be called
1778-
// with a `Reader::Guest { .. }` parameter from
1779-
// `guest_read`, which upholds the requirements in
1780-
// `accept_reader`'s documentation.
1781-
accept: Box::new(unsafe { accept_reader::<T, B, U>(store, buffer, tx) }),
1756+
accept: Box::new(accept_reader::<T, B, U>(store, buffer, tx)),
17821757
post_write,
17831758
};
17841759
post_write = PostWrite::Continue;
@@ -1794,10 +1769,7 @@ impl ComponentInstance {
17941769
..
17951770
} => {
17961771
let read_handle = transmit.read_handle;
1797-
// SAFETY: See the contract documented for this function and
1798-
// note that it covers the requirements specified in
1799-
// `accept_reader`'s documentation.
1800-
let code = unsafe { accept_reader::<T, B, U>(store.as_context_mut(), buffer, tx) }(
1772+
let code = accept_reader::<T, B, U>(store.as_context_mut(), buffer, tx)(
18011773
store.0.traitobj_mut(),
18021774
self,
18031775
Reader::Guest {
@@ -1818,17 +1790,14 @@ impl ComponentInstance {
18181790
}
18191791

18201792
ReadState::HostReady { accept } => {
1821-
// SAFETY: Per the requirements described in `accept_writer`'s
1822-
// documentation, we give up ownership of the items by calling
1823-
// `buffer.forget` immediately after the following call.
1793+
let count = buffer.remaining().len();
18241794
let code = accept(Writer::Host {
1825-
pointer: buffer.remaining().as_ptr().cast(),
1826-
count: buffer.remaining().len(),
1795+
buffer: &mut buffer,
1796+
count,
18271797
})?;
1828-
let ReturnCode::Completed(n) = code else {
1798+
let ReturnCode::Completed(_) = code else {
18291799
unreachable!()
18301800
};
1831-
buffer.forget(n.try_into().unwrap());
18321801

18331802
_ = tx.send(HostResult {
18341803
buffer,
@@ -1852,20 +1821,7 @@ impl ComponentInstance {
18521821
}
18531822

18541823
/// Attempt to read items from the specified stream or future.
1855-
///
1856-
/// SAFETY: When the `TransmitState::write` field of the state to
1857-
/// which the stream or future belongs is `WriteState::HostReady`, its
1858-
/// `accept` callback will be passed a `Reader::Host` whose `accept` closure
1859-
/// requries a valid `*mut T` array of `count` elements, and those elements
1860-
/// must be forgotten using e.g. `mem::forget` after the call to that
1861-
/// closure returns since they will have been moved into another buffer
1862-
/// (i.e. the destination buffer will take ownership of them).
1863-
// TODO: `Reader::Host::accept` should really be a `Box<dyn UnsafeFnOnce>`
1864-
// (where `UnsafeFnOnce` would need to be a trait we define ourselves, since
1865-
// there's no standard equivalent) rather than a `Box<dyn FnOnce>`. That
1866-
// would force the caller to use an unsafe block and (hopefully) uphold the
1867-
// contract described above.
1868-
unsafe fn host_read<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U: 'static>(
1824+
fn host_read<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U: 'static>(
18691825
&mut self,
18701826
store: StoreContextMut<U>,
18711827
rep: u32,
@@ -1941,13 +1897,9 @@ impl ComponentInstance {
19411897
store.0.traitobj_mut(),
19421898
self,
19431899
Reader::Host {
1944-
accept: Box::new(move |pointer, count| {
1900+
accept: Box::new(move |input, count| {
19451901
let count = count.min(buffer.remaining_capacity());
1946-
// SAFETY: Per the contact of `host_read`, `pointer`
1947-
// is a valid `*mut T` array of `count` elements
1948-
// which we may move (i.e. transfer ownership) into
1949-
// the destination buffer.
1950-
unsafe { buffer.copy_from(pointer.cast(), count) };
1902+
buffer.move_from(input, count);
19511903
_ = tx.send(HostResult {
19521904
buffer,
19531905
closed: false,

0 commit comments

Comments
 (0)