Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit 4280e78

Browse files
authored
Merge pull request #148 from bytecodealliance/dicej/fix-137
return pending event if present instead of cancelling reads/writes
2 parents b50fb71 + 07a093b commit 4280e78

4 files changed

Lines changed: 228 additions & 0 deletions

File tree

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1841,6 +1841,15 @@ impl ComponentInstance {
18411841
let transmit_id = TableId::<TransmitState>::new(rep);
18421842
let transmit = self.get_mut(transmit_id)?;
18431843

1844+
if let Some(event) = Waitable::Transmit(transmit.write_handle).take_event(self)? {
1845+
let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
1846+
unreachable!();
1847+
};
1848+
return Ok(code);
1849+
}
1850+
1851+
let transmit = self.get_mut(transmit_id)?;
1852+
18441853
match &transmit.write {
18451854
WriteState::GuestReady { .. } | WriteState::HostReady { .. } => {
18461855
transmit.write = WriteState::Open;
@@ -1858,6 +1867,15 @@ impl ComponentInstance {
18581867
let transmit_id = TableId::<TransmitState>::new(rep);
18591868
let transmit = self.get_mut(transmit_id)?;
18601869

1870+
if let Some(event) = Waitable::Transmit(transmit.read_handle).take_event(self)? {
1871+
let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
1872+
unreachable!();
1873+
};
1874+
return Ok(code);
1875+
}
1876+
1877+
let transmit = self.get_mut(transmit_id)?;
1878+
18611879
match &transmit.read {
18621880
ReadState::GuestReady { .. } | ReadState::HostReady { .. } => {
18631881
transmit.read = ReadState::Open;
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
;;! component_model_async = true
2+
3+
;; Create a future, start a read, close the write end, and cancel the read.
4+
(component
5+
(type $f (future))
6+
(core func $new (canon future.new $f))
7+
(core module $libc (memory (export "mem") 1))
8+
(core instance $libc (instantiate $libc))
9+
(core func $read (canon future.read $f async (memory $libc "mem")))
10+
(core func $cancel (canon future.cancel-read $f))
11+
(core func $close-write (canon future.close-writable $f))
12+
(core module $m
13+
(import "" "new" (func $new (result i64)))
14+
(import "" "read" (func $read (param i32 i32) (result i32)))
15+
(import "" "cancel" (func $cancel (param i32) (result i32)))
16+
(import "" "close-write" (func $close-write (param i32)))
17+
18+
(func (export "f") (result i32)
19+
(local $read i32)
20+
(local $write i32)
21+
(local $new i64)
22+
23+
(local.set $new (call $new))
24+
(local.set $read (i32.wrap_i64 (local.get $new)))
25+
(local.set $write (i32.wrap_i64 (i64.shr_u (local.get $new) (i64.const 32))))
26+
27+
;; start a read
28+
local.get $read
29+
i32.const 0
30+
call $read
31+
i32.const -1
32+
i32.ne
33+
if unreachable end
34+
35+
;; close the write end
36+
local.get $write
37+
call $close-write
38+
39+
;; cancel the read, returning the result
40+
local.get $read
41+
call $cancel
42+
)
43+
)
44+
45+
(core instance $i (instantiate $m
46+
(with "" (instance
47+
(export "new" (func $new))
48+
(export "read" (func $read))
49+
(export "cancel" (func $cancel))
50+
(export "close-write" (func $close-write))
51+
))
52+
))
53+
54+
(func (export "f") (result u32) (canon lift (core func $i "f")))
55+
)
56+
57+
(assert_return (invoke "f") (u32.const 1)) ;; expect CLOSED status (not CANCELLED)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
;;! component_model_async = true
2+
3+
;; Create a future, start a write, close the read end, and cancel the write.
4+
(component
5+
(type $f (future))
6+
(core func $new (canon future.new $f))
7+
(core module $libc (memory (export "mem") 1))
8+
(core instance $libc (instantiate $libc))
9+
(core func $write (canon future.write $f async (memory $libc "mem")))
10+
(core func $cancel (canon future.cancel-write $f))
11+
(core func $close-read (canon future.close-readable $f))
12+
(core module $m
13+
(import "" "new" (func $new (result i64)))
14+
(import "" "write" (func $write (param i32 i32) (result i32)))
15+
(import "" "cancel" (func $cancel (param i32) (result i32)))
16+
(import "" "close-read" (func $close-read (param i32)))
17+
18+
(func (export "f") (result i32)
19+
(local $read i32)
20+
(local $write i32)
21+
(local $new i64)
22+
23+
(local.set $new (call $new))
24+
(local.set $read (i32.wrap_i64 (local.get $new)))
25+
(local.set $write (i32.wrap_i64 (i64.shr_u (local.get $new) (i64.const 32))))
26+
27+
;; start a write
28+
local.get $write
29+
i32.const 0
30+
call $write
31+
i32.const -1
32+
i32.ne
33+
if unreachable end
34+
35+
;; close the read end
36+
local.get $read
37+
call $close-read
38+
39+
;; cancel the write, returning the result
40+
local.get $write
41+
call $cancel
42+
)
43+
)
44+
45+
(core instance $i (instantiate $m
46+
(with "" (instance
47+
(export "new" (func $new))
48+
(export "write" (func $write))
49+
(export "cancel" (func $cancel))
50+
(export "close-read" (func $close-read))
51+
))
52+
))
53+
54+
(func (export "f") (result u32) (canon lift (core func $i "f")))
55+
)
56+
57+
(assert_return (invoke "f") (u32.const 1)) ;; expect CLOSED status (not CANCELLED)
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
;;! component_model_async = true
2+
3+
;; Create a future, start a write, let it complete, and cancel the write prior
4+
;; to receiving the completion event.
5+
(component
6+
(type $f (future))
7+
8+
(component $c
9+
(type $f (future))
10+
11+
(core module $libc (memory (export "mem") 1))
12+
(core instance $libc (instantiate $libc))
13+
(core func $read (canon future.read $f async (memory $libc "mem")))
14+
(core func $close-read (canon future.close-readable $f))
15+
(core module $inner
16+
(import "" "read" (func $read (param i32 i32) (result i32)))
17+
(import "" "close-read" (func $close-read (param i32)))
18+
19+
(func (export "f") (param i32)
20+
;; start a read, asserting it completes with one item
21+
local.get 0
22+
i32.const 0
23+
call $read
24+
i32.const 0x10
25+
i32.ne
26+
if unreachable end
27+
28+
;; close the read end
29+
local.get 0
30+
call $close-read
31+
)
32+
)
33+
34+
(core instance $i (instantiate $inner
35+
(with "" (instance
36+
(export "read" (func $read))
37+
(export "close-read" (func $close-read))
38+
))
39+
))
40+
41+
(func (export "f") (param "x" $f) (canon lift (core func $i "f")))
42+
)
43+
(instance $c (instantiate $c))
44+
45+
(core func $new (canon future.new $f))
46+
(core module $libc (memory (export "mem") 1))
47+
(core instance $libc (instantiate $libc))
48+
(core func $write (canon future.write $f async (memory $libc "mem")))
49+
(core func $cancel (canon future.cancel-write $f))
50+
(core func $drain (canon lower (func $c "f")))
51+
(core module $m
52+
(import "" "new" (func $new (result i64)))
53+
(import "" "write" (func $write (param i32 i32) (result i32)))
54+
(import "" "cancel" (func $cancel (param i32) (result i32)))
55+
(import "" "drain" (func $drain (param i32)))
56+
57+
(func (export "f") (result i32)
58+
(local $read i32)
59+
(local $write i32)
60+
(local $new i64)
61+
62+
(local.set $new (call $new))
63+
(local.set $read (i32.wrap_i64 (local.get $new)))
64+
(local.set $write (i32.wrap_i64 (i64.shr_u (local.get $new) (i64.const 32))))
65+
66+
;; start a write
67+
local.get $write
68+
i32.const 0
69+
call $write
70+
i32.const -1
71+
i32.ne
72+
if unreachable end
73+
74+
;; drain the read end
75+
local.get $read
76+
call $drain
77+
78+
;; cancel the write, returning the result
79+
local.get $write
80+
call $cancel
81+
)
82+
)
83+
84+
(core instance $i (instantiate $m
85+
(with "" (instance
86+
(export "new" (func $new))
87+
(export "write" (func $write))
88+
(export "cancel" (func $cancel))
89+
(export "drain" (func $drain))
90+
))
91+
))
92+
93+
(func (export "f") (result u32) (canon lift (core func $i "f")))
94+
)
95+
96+
(assert_return (invoke "f") (u32.const 0x10))

0 commit comments

Comments
 (0)