Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit 6ba521d

Browse files
authored
Merge pull request #178 from bytecodealliance/dicej/fix-172
trap if closing future to which no value has been written
2 parents 10355ff + e10ad9b commit 6ba521d

3 files changed

Lines changed: 174 additions & 12 deletions

File tree

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1416,6 +1416,8 @@ struct TransmitState {
14161416
writer_watcher: Option<oneshot::Sender<()>>,
14171417
/// Like `writer_watcher`, but for the reverse direction.
14181418
reader_watcher: Option<oneshot::Sender<()>>,
1419+
/// Whether the write end may be closed or not.
1420+
may_close_writer: bool,
14191421
}
14201422

14211423
impl Default for TransmitState {
@@ -1427,6 +1429,7 @@ impl Default for TransmitState {
14271429
write: WriteState::Open,
14281430
reader_watcher: None,
14291431
writer_watcher: None,
1432+
may_close_writer: true,
14301433
}
14311434
}
14321435
}
@@ -1550,7 +1553,7 @@ impl Instance {
15501553
store
15511554
.as_context_mut()
15521555
.with_detached_instance(self, |mut store, instance| {
1553-
let (write, read) = instance.new_transmit()?;
1556+
let (write, read) = instance.new_transmit(TransmitKind::Future)?;
15541557

15551558
Ok((
15561559
FutureWriter::new(
@@ -1591,7 +1594,7 @@ impl Instance {
15911594
store
15921595
.as_context_mut()
15931596
.with_detached_instance(self, |mut store, instance| {
1594-
let (write, read) = instance.new_transmit()?;
1597+
let (write, read) = instance.new_transmit(TransmitKind::Stream)?;
15951598

15961599
Ok((
15971600
StreamWriter::new(
@@ -1899,7 +1902,10 @@ impl ComponentInstance {
18991902

19001903
/// Allocate a new future or stream, including the `TransmitState` and the
19011904
/// `TransmitHandle`s corresponding to the read and write ends.
1902-
fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
1905+
fn new_transmit(
1906+
&mut self,
1907+
kind: TransmitKind,
1908+
) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
19031909
let state_id = self.push(TransmitState::default())?;
19041910

19051911
let write = self.push(TransmitHandle::new(state_id))?;
@@ -1909,6 +1915,10 @@ impl ComponentInstance {
19091915
state.write_handle = write;
19101916
state.read_handle = read;
19111917

1918+
if let TransmitKind::Future = kind {
1919+
state.may_close_writer = false;
1920+
}
1921+
19121922
log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
19131923

19141924
Ok((write, read))
@@ -1941,7 +1951,10 @@ impl ComponentInstance {
19411951
/// write ends to the (sub-)component instance to which the specified
19421952
/// `TableIndex` belongs.
19431953
fn guest_new(&mut self, ty: TableIndex) -> Result<ResourcePair> {
1944-
let (write, read) = self.new_transmit()?;
1954+
let (write, read) = self.new_transmit(match ty {
1955+
TableIndex::Future(_) => TransmitKind::Future,
1956+
TableIndex::Stream(_) => TransmitKind::Stream,
1957+
})?;
19451958
let read = self
19461959
.state_table(ty)
19471960
.insert(read.rep(), waitable_state(ty, StreamFutureState::Read))?;
@@ -1975,6 +1988,7 @@ impl ComponentInstance {
19751988
let transmit = self
19761989
.get_mut(transmit_id)
19771990
.with_context(|| format!("retrieving state for transmit [{transmit_rep}]"))?;
1991+
transmit.may_close_writer = true;
19781992

19791993
let new_state = if let ReadState::Closed = &transmit.read {
19801994
ReadState::Closed
@@ -2176,8 +2190,11 @@ impl ComponentInstance {
21762190

21772191
/// Cancel a pending stream or future write from the host.
21782192
///
2179-
/// `rep` is the `TransmitState` rep for the stream or future.
2180-
fn host_cancel_write(&mut self, rep: u32) -> Result<ReturnCode> {
2193+
/// # Arguments
2194+
///
2195+
/// * `rep` - The `TransmitState` rep for the stream or future.
2196+
/// * `kind` - Whether `rep` is for a stream or a future.
2197+
fn host_cancel_write(&mut self, rep: u32, kind: TransmitKind) -> Result<ReturnCode> {
21812198
let transmit_id = TableId::<TransmitState>::new(rep);
21822199
let transmit = self.get_mut(transmit_id)?;
21832200

@@ -2208,6 +2225,10 @@ impl ComponentInstance {
22082225

22092226
log::trace!("cancelled write {transmit_id:?}");
22102227

2228+
if let (TransmitKind::Future, ReturnCode::Cancelled(0)) = (kind, code) {
2229+
transmit.may_close_writer = false;
2230+
}
2231+
22112232
Ok(code)
22122233
}
22132234

@@ -2259,6 +2280,10 @@ impl ComponentInstance {
22592280
.get_mut(transmit_id)
22602281
.with_context(|| format!("error closing writer {transmit_rep}"))?;
22612282

2283+
if !transmit.may_close_writer {
2284+
bail!("cannot close future write end without first writing a value")
2285+
}
2286+
22622287
transmit.writer_watcher = None;
22632288

22642289
// Existing queued transmits must be updated with information for the impending writer closure
@@ -2652,6 +2677,7 @@ impl ComponentInstance {
26522677
let transmit_id = self.get(transmit_handle)?.state;
26532678
log::trace!("guest_write {transmit_handle:?} (handle {handle}; state {transmit_id:?})",);
26542679
let transmit = self.get_mut(transmit_id)?;
2680+
transmit.may_close_writer = true;
26552681
let new_state = if let ReadState::Closed = &transmit.read {
26562682
ReadState::Closed
26572683
} else {
@@ -3018,10 +3044,11 @@ impl ComponentInstance {
30183044
writer: u32,
30193045
_async_: bool,
30203046
) -> Result<ReturnCode> {
3021-
let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) =
3022-
self.state_table(ty).get_mut_by_index(writer)?
3023-
else {
3024-
bail!("invalid stream or future handle");
3047+
let (rep, state) = self.state_table(ty).get_mut_by_index(writer)?;
3048+
let (state, kind) = match state {
3049+
WaitableState::Stream(_, state) => (state, TransmitKind::Stream),
3050+
WaitableState::Future(_, state) => (state, TransmitKind::Future),
3051+
_ => bail!("invalid stream or future handle"),
30253052
};
30263053
let id = TableId::<TransmitHandle>::new(rep);
30273054
log::trace!("guest cancel write {id:?} (handle {writer})");
@@ -3037,7 +3064,7 @@ impl ComponentInstance {
30373064
}
30383065
}
30393066
let rep = self.get(id)?.state.rep();
3040-
self.host_cancel_write(rep)
3067+
self.host_cancel_write(rep, kind)
30413068
}
30423069

30433070
/// Cancel a pending read for the specified stream or future from the guest.

tests/misc_testsuite/component-model-async/future-cancel-read-closed.wast

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88
(core instance $libc (instantiate $libc))
99
(core func $read (canon future.read $f async (memory $libc "mem")))
1010
(core func $cancel (canon future.cancel-read $f))
11+
(core func $write (canon future.write $f async (memory $libc "mem")))
1112
(core func $close-write (canon future.close-writable $f))
1213
(core module $m
1314
(import "" "new" (func $new (result i64)))
1415
(import "" "read" (func $read (param i32 i32) (result i32)))
1516
(import "" "cancel" (func $cancel (param i32) (result i32)))
17+
(import "" "write" (func $write (param i32 i32) (result i32)))
1618
(import "" "close-write" (func $close-write (param i32)))
1719

1820
(func (export "f") (result i32)
@@ -34,6 +36,10 @@
3436

3537
;; close the write end
3638
local.get $write
39+
i32.const 0
40+
call $write
41+
drop
42+
local.get $write
3743
call $close-write
3844

3945
;; cancel the read, returning the result
@@ -47,11 +53,12 @@
4753
(export "new" (func $new))
4854
(export "read" (func $read))
4955
(export "cancel" (func $cancel))
56+
(export "write" (func $write))
5057
(export "close-write" (func $close-write))
5158
))
5259
))
5360

5461
(func (export "f") (result u32) (canon lift (core func $i "f")))
5562
)
5663

57-
(assert_return (invoke "f") (u32.const 1)) ;; expect CLOSED status (not CANCELLED)
64+
(assert_return (invoke "f") (u32.const 0x11)) ;; expect CLOSED status (not CANCELLED)
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
;;! component_model_async = true
2+
3+
;; This test contains two components $C and $D that test that a trap occurs
4+
;; when closing the writable end of a future (in $C) before having written
5+
;; a value while closing the readable end of a future (in $D) before reading
6+
;; a value is fine.
7+
;;
8+
;; (Copied from
9+
;; https://github.com/WebAssembly/component-model/blob/future-trap/test/async/futures-must-write.wast)
10+
(component
11+
(component $C
12+
(core module $Memory (memory (export "mem") 1))
13+
(core instance $memory (instantiate $Memory))
14+
(core module $CM
15+
(import "" "mem" (memory 1))
16+
(import "" "future.new" (func $future.new (result i64)))
17+
(import "" "future.write" (func $future.write (param i32 i32) (result i32)))
18+
(import "" "future.close-writable" (func $future.close-writable (param i32)))
19+
20+
(global $fw (mut i32) (i32.const 0))
21+
22+
(func $start-future (export "start-future") (result i32)
23+
;; create a new future, return the readable end to the caller
24+
(local $ret64 i64)
25+
(local.set $ret64 (call $future.new))
26+
(global.set $fw (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32))))
27+
(i32.wrap_i64 (local.get $ret64))
28+
)
29+
(func $attempt-write (export "attempt-write") (result i32)
30+
;; because the caller already closed the readable end, this write will eagerly
31+
;; return CLOSED having written no values.
32+
(local $ret i32)
33+
(local.set $ret (call $future.write (global.get $fw) (i32.const 42)))
34+
(if (i32.ne (i32.const 0x01 (; CLOSED=1 | (0<<4) ;)) (local.get $ret))
35+
(then
36+
(i32.load (i32.add (local.get $ret) (i32.const 0x8000_0000)))
37+
unreachable))
38+
39+
;; return without trapping
40+
(i32.const 42)
41+
)
42+
(func $close-writable (export "close-writable")
43+
;; maybe boom
44+
(call $future.close-writable (global.get $fw))
45+
)
46+
)
47+
(type $FT (future u8))
48+
(canon future.new $FT (core func $future.new))
49+
(canon future.write $FT async (memory $memory "mem") (core func $future.write))
50+
(canon future.close-writable $FT (core func $future.close-writable))
51+
(core instance $cm (instantiate $CM (with "" (instance
52+
(export "mem" (memory $memory "mem"))
53+
(export "future.new" (func $future.new))
54+
(export "future.write" (func $future.write))
55+
(export "future.close-writable" (func $future.close-writable))
56+
))))
57+
(func (export "start-future") (result (future u8)) (canon lift (core func $cm "start-future")))
58+
(func (export "attempt-write") (result u32) (canon lift (core func $cm "attempt-write")))
59+
(func (export "close-writable") (canon lift (core func $cm "close-writable")))
60+
)
61+
(component $D
62+
(import "c" (instance $c
63+
(export "start-future" (func (result (future u8))))
64+
(export "attempt-write" (func (result u32)))
65+
(export "close-writable" (func))
66+
))
67+
68+
(core module $Memory (memory (export "mem") 1))
69+
(core instance $memory (instantiate $Memory))
70+
(core module $Core
71+
(import "" "mem" (memory 1))
72+
(import "" "future.read" (func $future.read (param i32 i32) (result i32)))
73+
(import "" "future.close-readable" (func $future.close-readable (param i32)))
74+
(import "" "start-future" (func $start-future (result i32)))
75+
(import "" "attempt-write" (func $attempt-write (result i32)))
76+
(import "" "close-writable" (func $close-writable))
77+
78+
(func $close-readable-future-before-read (export "close-readable-future-before-read") (result i32)
79+
;; call 'start-future' to get the future we'll be working with
80+
(local $fr i32)
81+
(local.set $fr (call $start-future))
82+
(if (i32.ne (i32.const 1) (local.get $fr))
83+
(then unreachable))
84+
85+
;; ok to immediately close the readable end
86+
(call $future.close-readable (local.get $fr))
87+
88+
;; the callee will see that we closed the readable end when it tries to write
89+
(call $attempt-write)
90+
)
91+
(func $close-writable-future-before-write (export "close-writable-future-before-write")
92+
;; call 'start-future' to get the future we'll be working with
93+
(local $fr i32)
94+
(local.set $fr (call $start-future))
95+
(if (i32.ne (i32.const 1) (local.get $fr))
96+
(then unreachable))
97+
98+
;; boom
99+
(call $close-writable)
100+
)
101+
)
102+
(type $FT (future u8))
103+
(canon future.new $FT (core func $future.new))
104+
(canon future.read $FT async (memory $memory "mem") (core func $future.read))
105+
(canon future.close-readable $FT (core func $future.close-readable))
106+
(canon lower (func $c "start-future") (core func $start-future'))
107+
(canon lower (func $c "attempt-write") (core func $attempt-write'))
108+
(canon lower (func $c "close-writable") (core func $close-writable'))
109+
(core instance $core (instantiate $Core (with "" (instance
110+
(export "mem" (memory $memory "mem"))
111+
(export "future.new" (func $future.new))
112+
(export "future.read" (func $future.read))
113+
(export "future.close-readable" (func $future.close-readable))
114+
(export "start-future" (func $start-future'))
115+
(export "attempt-write" (func $attempt-write'))
116+
(export "close-writable" (func $close-writable'))
117+
))))
118+
(func (export "close-readable-future-before-read") (result u32) (canon lift (core func $core "close-readable-future-before-read")))
119+
(func (export "close-writable-future-before-write") (canon lift (core func $core "close-writable-future-before-write")))
120+
)
121+
(instance $c (instantiate $C))
122+
(instance $d (instantiate $D (with "c" (instance $c))))
123+
(func (export "close-writable-future-before-write") (alias export $d "close-writable-future-before-write"))
124+
(func (export "close-readable-future-before-read") (alias export $d "close-readable-future-before-read"))
125+
)
126+
127+
(assert_return (invoke "close-readable-future-before-read") (u32.const 42))
128+
(assert_trap (invoke "close-writable-future-before-write") "cannot close future write end without first writing a value")

0 commit comments

Comments
 (0)