Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit 8f7decf

Browse files
dicejlukewagner
andcommitted
return correct code from {stream,future}.cancel-{write,read}
Fixes #164 Co-authored-by: Luke Wagner <mail@lukewagner.name> Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent f25db47 commit 8f7decf

2 files changed

Lines changed: 230 additions & 8 deletions

File tree

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2056,12 +2056,20 @@ impl ComponentInstance {
20562056
let transmit_id = TableId::<TransmitState>::new(rep);
20572057
let transmit = self.get_mut(transmit_id)?;
20582058

2059-
if let Some(event) = Waitable::Transmit(transmit.write_handle).take_event(self)? {
2059+
let code = if let Some(event) =
2060+
Waitable::Transmit(transmit.write_handle).take_event(self)?
2061+
{
20602062
let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
20612063
unreachable!();
20622064
};
2063-
return Ok(code);
2064-
}
2065+
match code {
2066+
ReturnCode::Completed(count) => ReturnCode::Cancelled(count),
2067+
ReturnCode::Closed(_) => code,
2068+
_ => unreachable!(),
2069+
}
2070+
} else {
2071+
ReturnCode::Cancelled(0)
2072+
};
20652073

20662074
let transmit = self.get_mut(transmit_id)?;
20672075

@@ -2075,7 +2083,7 @@ impl ComponentInstance {
20752083

20762084
log::trace!("cancelled write {transmit_id:?}");
20772085

2078-
Ok(ReturnCode::Cancelled(0))
2086+
Ok(code)
20792087
}
20802088

20812089
/// Cancel a pending stream or future read from the host.
@@ -2087,12 +2095,18 @@ impl ComponentInstance {
20872095
let transmit_id = TableId::<TransmitState>::new(rep);
20882096
let transmit = self.get_mut(transmit_id)?;
20892097

2090-
if let Some(event) = Waitable::Transmit(transmit.read_handle).take_event(self)? {
2098+
let code = if let Some(event) = Waitable::Transmit(transmit.read_handle).take_event(self)? {
20912099
let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
20922100
unreachable!();
20932101
};
2094-
return Ok(code);
2095-
}
2102+
match code {
2103+
ReturnCode::Completed(count) => ReturnCode::Cancelled(count),
2104+
ReturnCode::Closed(_) => code,
2105+
_ => unreachable!(),
2106+
}
2107+
} else {
2108+
ReturnCode::Cancelled(0)
2109+
};
20962110

20972111
let transmit = self.get_mut(transmit_id)?;
20982112

@@ -2106,7 +2120,7 @@ impl ComponentInstance {
21062120

21072121
log::trace!("cancelled read {transmit_id:?}");
21082122

2109-
Ok(ReturnCode::Cancelled(0))
2123+
Ok(code)
21102124
}
21112125

21122126
/// Close the write end of a stream or future read from the host.
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
;;! component_model_async = true
2+
3+
;; This test contains two components $C and $D that test cancelling reads
4+
;; and writes in the presence and absence of partial reads/writes.
5+
;;
6+
;; $C exports a function 'start-stream' that creates and holds onto a writable
7+
;; stream in the global $sw as well as various operations that operate on $sw.
8+
;; $D calls $C.start-stream to get the readable end and then drives the test.
9+
;;
10+
;; (Copied from
11+
;; https://github.com/WebAssembly/component-model/blob/add-tests/test/concurrency/cancel-stream.wast)
12+
(component
13+
(component $C
14+
(core module $Memory (memory (export "mem") 1))
15+
(core instance $memory (instantiate $Memory))
16+
(core module $CM
17+
(import "" "mem" (memory 1))
18+
(import "" "task.return" (func $task.return (param i32)))
19+
(import "" "stream.new" (func $stream.new (result i64)))
20+
(import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32)))
21+
(import "" "stream.cancel-write" (func $stream.cancel-write (param i32) (result i32)))
22+
(import "" "stream.close-writable" (func $stream.close-writable (param i32)))
23+
24+
(global $sw (mut i32) (i32.const 0))
25+
26+
(func $start-stream (export "start-stream") (result i32)
27+
;; create a new stream, return the readable end to the caller
28+
(local $ret64 i64)
29+
(local.set $ret64 (call $stream.new))
30+
(global.set $sw (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32))))
31+
(i32.wrap_i64 (local.get $ret64))
32+
)
33+
34+
(func $write4 (export "write4")
35+
;; write 6 bytes into the stream, expecting to rendezvous with a stream.read
36+
(local $ret i32)
37+
(i32.store (i32.const 8) (i32.const 0xabcd))
38+
(local.set $ret (call $stream.write (global.get $sw) (i32.const 8) (i32.const 4)))
39+
(if (i32.ne (i32.const 0x40 (; COMPLETED=0 | (4<<4) ;)) (local.get $ret))
40+
(then unreachable))
41+
)
42+
43+
(func $write4-and-close (export "write4-and-close")
44+
(call $write4)
45+
(call $stream.close-writable (global.get $sw))
46+
)
47+
48+
(func $start-blocking-write (export "start-blocking-write")
49+
(local $ret i32)
50+
51+
;; prepare the write buffer
52+
(i64.store (i32.const 8) (i64.const 0x123456789abcdef))
53+
54+
;; start one blocking write and immediately cancel it
55+
(local.set $ret (call $stream.write (global.get $sw) (i32.const 8) (i32.const 8)))
56+
(if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret))
57+
(then unreachable))
58+
(local.set $ret (call $stream.cancel-write (global.get $sw)))
59+
(if (i32.ne (i32.const 0x2 (; CANCELLED ;)) (local.get $ret))
60+
(then unreachable))
61+
62+
;; start a second blockign write and leave it pending
63+
(local.set $ret (call $stream.write (global.get $sw) (i32.const 8) (i32.const 8)))
64+
(if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret))
65+
(then unreachable))
66+
)
67+
68+
(func $cancel-after-read4 (export "cancel-after-read4")
69+
(local $ret i32)
70+
(local.set $ret (call $stream.cancel-write (global.get $sw)))
71+
(if (i32.ne (i32.const 0x42 (; CANCELLED=2 | (4<<4) ;)) (local.get $ret))
72+
(then unreachable))
73+
)
74+
)
75+
(type $ST (stream u8))
76+
(canon task.return (result u32) (core func $task.return))
77+
(canon stream.new $ST (core func $stream.new))
78+
(canon stream.write $ST async (memory $memory "mem") (core func $stream.write))
79+
(canon stream.cancel-write $ST (core func $stream.cancel-write))
80+
(canon stream.close-writable $ST (core func $stream.close-writable))
81+
(core instance $cm (instantiate $CM (with "" (instance
82+
(export "mem" (memory $memory "mem"))
83+
(export "task.return" (func $task.return))
84+
(export "stream.new" (func $stream.new))
85+
(export "stream.write" (func $stream.write))
86+
(export "stream.cancel-write" (func $stream.cancel-write))
87+
(export "stream.close-writable" (func $stream.close-writable))
88+
))))
89+
(func (export "start-stream") (result (stream u8)) (canon lift (core func $cm "start-stream")))
90+
(func (export "write4") (canon lift (core func $cm "write4")))
91+
(func (export "write4-and-close") (canon lift (core func $cm "write4-and-close")))
92+
(func (export "start-blocking-write") (canon lift (core func $cm "start-blocking-write")))
93+
(func (export "cancel-after-read4") (canon lift (core func $cm "cancel-after-read4")))
94+
)
95+
96+
(component $D
97+
(import "c" (instance $c
98+
(export "start-stream" (func (result (stream u8))))
99+
(export "write4" (func))
100+
(export "write4-and-close" (func))
101+
(export "start-blocking-write" (func))
102+
(export "cancel-after-read4" (func))
103+
))
104+
105+
(core module $Memory (memory (export "mem") 1))
106+
(core instance $memory (instantiate $Memory))
107+
(core module $DM
108+
(import "" "mem" (memory 1))
109+
(import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
110+
(import "" "stream.cancel-read" (func $stream.cancel-read (param i32) (result i32)))
111+
(import "" "stream.close-readable" (func $stream.close-readable (param i32)))
112+
(import "" "start-stream" (func $start-stream (result i32)))
113+
(import "" "write4" (func $write4))
114+
(import "" "write4-and-close" (func $write4-and-close))
115+
(import "" "start-blocking-write" (func $start-blocking-write))
116+
(import "" "cancel-after-read4" (func $cancel-after-read4))
117+
118+
(func $run (export "run") (result i32)
119+
(local $ret i32)
120+
(local $sr i32)
121+
122+
;; call 'start-stream' to get the stream we'll be working with
123+
(local.set $sr (call $start-stream))
124+
(if (i32.ne (i32.const 1) (local.get $sr))
125+
(then unreachable))
126+
127+
;; start read that will block
128+
(local.set $ret (call $stream.read (local.get $sr) (i32.const 8) (i32.const 100)))
129+
(if (i32.ne (i32.const -1 (; BLOCKED;)) (local.get $ret))
130+
(then unreachable))
131+
132+
;; cancelling it will finish without anything having been written
133+
(local.set $ret (call $stream.cancel-read (local.get $sr)))
134+
(if (i32.ne (i32.const 0x2 (; CANCELLED ;)) (local.get $ret))
135+
(then unreachable))
136+
137+
;; read, block, call $C to write 4 bytes into the buffer,
138+
;; then cancel, which should show "4+cancelled"
139+
(local.set $ret (call $stream.read (local.get $sr) (i32.const 8) (i32.const 100)))
140+
(if (i32.ne (i32.const -1 (; BLOCKED;)) (local.get $ret))
141+
(then unreachable))
142+
(call $write4)
143+
(local.set $ret (call $stream.cancel-read (local.get $sr)))
144+
(if (i32.ne (i32.const 0x42 (; CANCELLED=2 | (4<<4) ;)) (local.get $ret))
145+
(then unreachable))
146+
(if (i32.ne (i32.const 0xabcd) (i32.load (i32.const 8)))
147+
(then unreachable))
148+
149+
;; read, block, call $C to write 4 bytes into the buffer and close,
150+
;; then cancel, which should show "4+closed"
151+
(local.set $ret (call $stream.read (local.get $sr) (i32.const 8) (i32.const 100)))
152+
(if (i32.ne (i32.const -1 (; BLOCKED;)) (local.get $ret))
153+
(then unreachable))
154+
(call $write4-and-close)
155+
(local.set $ret (call $stream.cancel-read (local.get $sr)))
156+
(if (i32.ne (i32.const 0x41 (; CLOSED=1 | (4<<4) ;)) (local.get $ret))
157+
(then unreachable))
158+
(if (i32.ne (i32.const 0xabcd) (i32.load (i32.const 8)))
159+
(then unreachable))
160+
(call $stream.close-readable (local.get $sr))
161+
162+
;; get a new $sr
163+
(local.set $sr (call $start-stream))
164+
(if (i32.ne (i32.const 1) (local.get $sr))
165+
(then unreachable))
166+
167+
;; start outstanding write in $C, read 4 of it, then call back into $C
168+
;; which will cancel and see 4 written.
169+
(call $start-blocking-write)
170+
(local.set $ret (call $stream.read (local.get $sr) (i32.const 8) (i32.const 4)))
171+
(if (i32.ne (i32.const 0x40 (; COMPLETED=0 | (4<<4) ;)) (local.get $ret))
172+
(then unreachable))
173+
(if (i32.ne (i32.const 0x89abcdef) (i32.load (i32.const 8)))
174+
(then unreachable))
175+
(call $cancel-after-read4)
176+
177+
;; return 42 to the top-level assert_return
178+
(i32.const 42)
179+
)
180+
)
181+
(type $ST (stream u8))
182+
(canon stream.read $ST async (memory $memory "mem") (core func $stream.read))
183+
(canon stream.cancel-read $ST (core func $stream.cancel-read))
184+
(canon stream.close-readable $ST (core func $stream.close-readable))
185+
(canon lower (func $c "start-stream") (core func $start-stream'))
186+
(canon lower (func $c "write4") (core func $write4'))
187+
(canon lower (func $c "write4-and-close") (core func $write4-and-close'))
188+
(canon lower (func $c "start-blocking-write") (core func $start-blocking-write'))
189+
(canon lower (func $c "cancel-after-read4") (core func $cancel-after-read4'))
190+
(core instance $dm (instantiate $DM (with "" (instance
191+
(export "mem" (memory $memory "mem"))
192+
(export "stream.read" (func $stream.read))
193+
(export "stream.cancel-read" (func $stream.cancel-read))
194+
(export "stream.close-readable" (func $stream.close-readable))
195+
(export "start-stream" (func $start-stream'))
196+
(export "write4" (func $write4'))
197+
(export "write4-and-close" (func $write4-and-close'))
198+
(export "start-blocking-write" (func $start-blocking-write'))
199+
(export "cancel-after-read4" (func $cancel-after-read4'))
200+
))))
201+
(func (export "run") (result u32) (canon lift (core func $dm "run")))
202+
)
203+
204+
(instance $c (instantiate $C))
205+
(instance $d (instantiate $D (with "c" (instance $c))))
206+
(func (export "run") (alias export $d "run"))
207+
)
208+
(assert_return (invoke "run") (u32.const 42))

0 commit comments

Comments
 (0)