diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index 8eb8d4bee2..a04e75dc7d 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -1841,6 +1841,15 @@ impl ComponentInstance { let transmit_id = TableId::::new(rep); let transmit = self.get_mut(transmit_id)?; + if let Some(event) = Waitable::Transmit(transmit.write_handle).take_event(self)? { + let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else { + unreachable!(); + }; + return Ok(code); + } + + let transmit = self.get_mut(transmit_id)?; + match &transmit.write { WriteState::GuestReady { .. } | WriteState::HostReady { .. } => { transmit.write = WriteState::Open; @@ -1858,6 +1867,15 @@ impl ComponentInstance { let transmit_id = TableId::::new(rep); let transmit = self.get_mut(transmit_id)?; + if let Some(event) = Waitable::Transmit(transmit.read_handle).take_event(self)? { + let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else { + unreachable!(); + }; + return Ok(code); + } + + let transmit = self.get_mut(transmit_id)?; + match &transmit.read { ReadState::GuestReady { .. } | ReadState::HostReady { .. } => { transmit.read = ReadState::Open; diff --git a/tests/misc_testsuite/component-model-async/future-cancel-read-closed.wast b/tests/misc_testsuite/component-model-async/future-cancel-read-closed.wast new file mode 100644 index 0000000000..f7a88adde9 --- /dev/null +++ b/tests/misc_testsuite/component-model-async/future-cancel-read-closed.wast @@ -0,0 +1,57 @@ +;;! component_model_async = true + +;; Create a future, start a read, close the write end, and cancel the read. +(component + (type $f (future)) + (core func $new (canon future.new $f)) + (core module $libc (memory (export "mem") 1)) + (core instance $libc (instantiate $libc)) + (core func $read (canon future.read $f async (memory $libc "mem"))) + (core func $cancel (canon future.cancel-read $f)) + (core func $close-write (canon future.close-writable $f)) + (core module $m + (import "" "new" (func $new (result i64))) + (import "" "read" (func $read (param i32 i32) (result i32))) + (import "" "cancel" (func $cancel (param i32) (result i32))) + (import "" "close-write" (func $close-write (param i32))) + + (func (export "f") (result i32) + (local $read i32) + (local $write i32) + (local $new i64) + + (local.set $new (call $new)) + (local.set $read (i32.wrap_i64 (local.get $new))) + (local.set $write (i32.wrap_i64 (i64.shr_u (local.get $new) (i64.const 32)))) + + ;; start a read + local.get $read + i32.const 0 + call $read + i32.const -1 + i32.ne + if unreachable end + + ;; close the write end + local.get $write + call $close-write + + ;; cancel the read, returning the result + local.get $read + call $cancel + ) + ) + + (core instance $i (instantiate $m + (with "" (instance + (export "new" (func $new)) + (export "read" (func $read)) + (export "cancel" (func $cancel)) + (export "close-write" (func $close-write)) + )) + )) + + (func (export "f") (result u32) (canon lift (core func $i "f"))) +) + +(assert_return (invoke "f") (u32.const 1)) ;; expect CLOSED status (not CANCELLED) diff --git a/tests/misc_testsuite/component-model-async/future-cancel-write-closed.wast b/tests/misc_testsuite/component-model-async/future-cancel-write-closed.wast new file mode 100644 index 0000000000..0cca24a2a4 --- /dev/null +++ b/tests/misc_testsuite/component-model-async/future-cancel-write-closed.wast @@ -0,0 +1,57 @@ +;;! component_model_async = true + +;; Create a future, start a write, close the read end, and cancel the write. +(component + (type $f (future)) + (core func $new (canon future.new $f)) + (core module $libc (memory (export "mem") 1)) + (core instance $libc (instantiate $libc)) + (core func $write (canon future.write $f async (memory $libc "mem"))) + (core func $cancel (canon future.cancel-write $f)) + (core func $close-read (canon future.close-readable $f)) + (core module $m + (import "" "new" (func $new (result i64))) + (import "" "write" (func $write (param i32 i32) (result i32))) + (import "" "cancel" (func $cancel (param i32) (result i32))) + (import "" "close-read" (func $close-read (param i32))) + + (func (export "f") (result i32) + (local $read i32) + (local $write i32) + (local $new i64) + + (local.set $new (call $new)) + (local.set $read (i32.wrap_i64 (local.get $new))) + (local.set $write (i32.wrap_i64 (i64.shr_u (local.get $new) (i64.const 32)))) + + ;; start a write + local.get $write + i32.const 0 + call $write + i32.const -1 + i32.ne + if unreachable end + + ;; close the read end + local.get $read + call $close-read + + ;; cancel the write, returning the result + local.get $write + call $cancel + ) + ) + + (core instance $i (instantiate $m + (with "" (instance + (export "new" (func $new)) + (export "write" (func $write)) + (export "cancel" (func $cancel)) + (export "close-read" (func $close-read)) + )) + )) + + (func (export "f") (result u32) (canon lift (core func $i "f"))) +) + +(assert_return (invoke "f") (u32.const 1)) ;; expect CLOSED status (not CANCELLED) diff --git a/tests/misc_testsuite/component-model-async/future-cancel-write-completed.wast b/tests/misc_testsuite/component-model-async/future-cancel-write-completed.wast new file mode 100644 index 0000000000..fa9b177391 --- /dev/null +++ b/tests/misc_testsuite/component-model-async/future-cancel-write-completed.wast @@ -0,0 +1,96 @@ +;;! component_model_async = true + +;; Create a future, start a write, let it complete, and cancel the write prior +;; to receiving the completion event. +(component + (type $f (future)) + + (component $c + (type $f (future)) + + (core module $libc (memory (export "mem") 1)) + (core instance $libc (instantiate $libc)) + (core func $read (canon future.read $f async (memory $libc "mem"))) + (core func $close-read (canon future.close-readable $f)) + (core module $inner + (import "" "read" (func $read (param i32 i32) (result i32))) + (import "" "close-read" (func $close-read (param i32))) + + (func (export "f") (param i32) + ;; start a read, asserting it completes with one item + local.get 0 + i32.const 0 + call $read + i32.const 0x10 + i32.ne + if unreachable end + + ;; close the read end + local.get 0 + call $close-read + ) + ) + + (core instance $i (instantiate $inner + (with "" (instance + (export "read" (func $read)) + (export "close-read" (func $close-read)) + )) + )) + + (func (export "f") (param "x" $f) (canon lift (core func $i "f"))) + ) + (instance $c (instantiate $c)) + + (core func $new (canon future.new $f)) + (core module $libc (memory (export "mem") 1)) + (core instance $libc (instantiate $libc)) + (core func $write (canon future.write $f async (memory $libc "mem"))) + (core func $cancel (canon future.cancel-write $f)) + (core func $drain (canon lower (func $c "f"))) + (core module $m + (import "" "new" (func $new (result i64))) + (import "" "write" (func $write (param i32 i32) (result i32))) + (import "" "cancel" (func $cancel (param i32) (result i32))) + (import "" "drain" (func $drain (param i32))) + + (func (export "f") (result i32) + (local $read i32) + (local $write i32) + (local $new i64) + + (local.set $new (call $new)) + (local.set $read (i32.wrap_i64 (local.get $new))) + (local.set $write (i32.wrap_i64 (i64.shr_u (local.get $new) (i64.const 32)))) + + ;; start a write + local.get $write + i32.const 0 + call $write + i32.const -1 + i32.ne + if unreachable end + + ;; drain the read end + local.get $read + call $drain + + ;; cancel the write, returning the result + local.get $write + call $cancel + ) + ) + + (core instance $i (instantiate $m + (with "" (instance + (export "new" (func $new)) + (export "write" (func $write)) + (export "cancel" (func $cancel)) + (export "drain" (func $drain)) + )) + )) + + (func (export "f") (result u32) (canon lift (core func $i "f"))) +) + +(assert_return (invoke "f") (u32.const 0x10))