Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit 6c01093

Browse files
committed
return pending event if present instead of cancelling reads/writes
Fixes #137 Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent 1280db7 commit 6c01093

3 files changed

Lines changed: 138 additions & 0 deletions

File tree

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1841,6 +1841,15 @@ impl ComponentInstance {
18411841
let transmit_id = TableId::<TransmitState>::new(rep);
18421842
let transmit = self.get_mut(transmit_id)?;
18431843

1844+
if let Some(event) = Waitable::Transmit(transmit.write_handle).take_event(self)? {
1845+
let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
1846+
unreachable!();
1847+
};
1848+
return Ok(code);
1849+
}
1850+
1851+
let transmit = self.get_mut(transmit_id)?;
1852+
18441853
match &transmit.write {
18451854
WriteState::GuestReady { .. } | WriteState::HostReady { .. } => {
18461855
transmit.write = WriteState::Open;
@@ -1858,6 +1867,15 @@ impl ComponentInstance {
18581867
let transmit_id = TableId::<TransmitState>::new(rep);
18591868
let transmit = self.get_mut(transmit_id)?;
18601869

1870+
if let Some(event) = Waitable::Transmit(transmit.read_handle).take_event(self)? {
1871+
let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
1872+
unreachable!();
1873+
};
1874+
return Ok(code);
1875+
}
1876+
1877+
let transmit = self.get_mut(transmit_id)?;
1878+
18611879
match &transmit.read {
18621880
ReadState::GuestReady { .. } | ReadState::HostReady { .. } => {
18631881
transmit.read = ReadState::Open;
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
;;! component_model_async = true
2+
;;! reference_types = true
3+
;;! gc_types = true
4+
;;! multi_memory = true
5+
6+
;; Create a future, start a read, close the write end, and cancel the read.
7+
(component
8+
(type $f (future))
9+
(core func $new (canon future.new $f))
10+
(core module $libc (memory (export "mem") 1))
11+
(core instance $libc (instantiate $libc))
12+
(core func $read (canon future.read $f async (memory $libc "mem")))
13+
(core func $cancel (canon future.cancel-read $f))
14+
(core func $close-write (canon future.close-writable $f))
15+
(core module $m
16+
(import "" "new" (func $new (result i64)))
17+
(import "" "read" (func $read (param i32 i32) (result i32)))
18+
(import "" "cancel" (func $cancel (param i32) (result i32)))
19+
(import "" "close-write" (func $close-write (param i32)))
20+
21+
(func (export "f") (result i32)
22+
(local $read i32)
23+
(local $write i32)
24+
(local $new i64)
25+
26+
(local.set $new (call $new))
27+
(local.set $read (i32.wrap_i64 (local.get $new)))
28+
(local.set $write (i32.wrap_i64 (i64.shr_u (local.get $new) (i64.const 32))))
29+
30+
;; start a read
31+
local.get $read
32+
i32.const 0
33+
call $read
34+
i32.const -1
35+
i32.ne
36+
if unreachable end
37+
38+
;; close the write end
39+
local.get $write
40+
call $close-write
41+
42+
;; cancel the read, returning the result
43+
local.get $read
44+
call $cancel
45+
)
46+
)
47+
48+
(core instance $i (instantiate $m
49+
(with "" (instance
50+
(export "new" (func $new))
51+
(export "read" (func $read))
52+
(export "cancel" (func $cancel))
53+
(export "close-write" (func $close-write))
54+
))
55+
))
56+
57+
(func (export "f") (result u32) (canon lift (core func $i "f")))
58+
)
59+
60+
(assert_return (invoke "f") (u32.const 1)) ;; expect CLOSED status (not CANCELLED)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
;;! component_model_async = true
2+
;;! reference_types = true
3+
;;! gc_types = true
4+
;;! multi_memory = true
5+
6+
;; Create a future, start a write, close the read end, and cancel the write.
7+
(component
8+
(type $f (future))
9+
(core func $new (canon future.new $f))
10+
(core module $libc (memory (export "mem") 1))
11+
(core instance $libc (instantiate $libc))
12+
(core func $write (canon future.write $f async (memory $libc "mem")))
13+
(core func $cancel (canon future.cancel-write $f))
14+
(core func $close-read (canon future.close-readable $f))
15+
(core module $m
16+
(import "" "new" (func $new (result i64)))
17+
(import "" "write" (func $write (param i32 i32) (result i32)))
18+
(import "" "cancel" (func $cancel (param i32) (result i32)))
19+
(import "" "close-read" (func $close-read (param i32)))
20+
21+
(func (export "f") (result i32)
22+
(local $read i32)
23+
(local $write i32)
24+
(local $new i64)
25+
26+
(local.set $new (call $new))
27+
(local.set $read (i32.wrap_i64 (local.get $new)))
28+
(local.set $write (i32.wrap_i64 (i64.shr_u (local.get $new) (i64.const 32))))
29+
30+
;; start a write
31+
local.get $write
32+
i32.const 0
33+
call $write
34+
i32.const -1
35+
i32.ne
36+
if unreachable end
37+
38+
;; close the read end
39+
local.get $read
40+
call $close-read
41+
42+
;; cancel the write, returning the result
43+
local.get $write
44+
call $cancel
45+
)
46+
)
47+
48+
(core instance $i (instantiate $m
49+
(with "" (instance
50+
(export "new" (func $new))
51+
(export "write" (func $write))
52+
(export "cancel" (func $cancel))
53+
(export "close-read" (func $close-read))
54+
))
55+
))
56+
57+
(func (export "f") (result u32) (canon lift (core func $i "f")))
58+
)
59+
60+
(assert_return (invoke "f") (u32.const 1)) ;; expect CLOSED status (not CANCELLED)

0 commit comments

Comments
 (0)