@@ -1416,6 +1416,8 @@ struct TransmitState {
14161416 writer_watcher : Option < oneshot:: Sender < ( ) > > ,
14171417 /// Like `writer_watcher`, but for the reverse direction.
14181418 reader_watcher : Option < oneshot:: Sender < ( ) > > ,
1419+ /// Whether the write end may be closed or not.
1420+ may_close_writer : bool ,
14191421}
14201422
14211423impl Default for TransmitState {
@@ -1427,6 +1429,7 @@ impl Default for TransmitState {
14271429 write : WriteState :: Open ,
14281430 reader_watcher : None ,
14291431 writer_watcher : None ,
1432+ may_close_writer : true ,
14301433 }
14311434 }
14321435}
@@ -1550,7 +1553,7 @@ impl Instance {
15501553 store
15511554 . as_context_mut ( )
15521555 . with_detached_instance ( self , |mut store, instance| {
1553- let ( write, read) = instance. new_transmit ( ) ?;
1556+ let ( write, read) = instance. new_transmit ( TransmitKind :: Future ) ?;
15541557
15551558 Ok ( (
15561559 FutureWriter :: new (
@@ -1591,7 +1594,7 @@ impl Instance {
15911594 store
15921595 . as_context_mut ( )
15931596 . with_detached_instance ( self , |mut store, instance| {
1594- let ( write, read) = instance. new_transmit ( ) ?;
1597+ let ( write, read) = instance. new_transmit ( TransmitKind :: Stream ) ?;
15951598
15961599 Ok ( (
15971600 StreamWriter :: new (
@@ -1899,7 +1902,10 @@ impl ComponentInstance {
18991902
19001903 /// Allocate a new future or stream, including the `TransmitState` and the
19011904 /// `TransmitHandle`s corresponding to the read and write ends.
1902- fn new_transmit ( & mut self ) -> Result < ( TableId < TransmitHandle > , TableId < TransmitHandle > ) > {
1905+ fn new_transmit (
1906+ & mut self ,
1907+ kind : TransmitKind ,
1908+ ) -> Result < ( TableId < TransmitHandle > , TableId < TransmitHandle > ) > {
19031909 let state_id = self . push ( TransmitState :: default ( ) ) ?;
19041910
19051911 let write = self . push ( TransmitHandle :: new ( state_id) ) ?;
@@ -1909,6 +1915,10 @@ impl ComponentInstance {
19091915 state. write_handle = write;
19101916 state. read_handle = read;
19111917
1918+ if let TransmitKind :: Future = kind {
1919+ state. may_close_writer = false ;
1920+ }
1921+
19121922 log:: trace!( "new transmit: state {state_id:?}; write {write:?}; read {read:?}" , ) ;
19131923
19141924 Ok ( ( write, read) )
@@ -1941,7 +1951,10 @@ impl ComponentInstance {
19411951 /// write ends to the (sub-)component instance to which the specified
19421952 /// `TableIndex` belongs.
19431953 fn guest_new ( & mut self , ty : TableIndex ) -> Result < ResourcePair > {
1944- let ( write, read) = self . new_transmit ( ) ?;
1954+ let ( write, read) = self . new_transmit ( match ty {
1955+ TableIndex :: Future ( _) => TransmitKind :: Future ,
1956+ TableIndex :: Stream ( _) => TransmitKind :: Stream ,
1957+ } ) ?;
19451958 let read = self
19461959 . state_table ( ty)
19471960 . insert ( read. rep ( ) , waitable_state ( ty, StreamFutureState :: Read ) ) ?;
@@ -1975,6 +1988,7 @@ impl ComponentInstance {
19751988 let transmit = self
19761989 . get_mut ( transmit_id)
19771990 . with_context ( || format ! ( "retrieving state for transmit [{transmit_rep}]" ) ) ?;
1991+ transmit. may_close_writer = true ;
19781992
19791993 let new_state = if let ReadState :: Closed = & transmit. read {
19801994 ReadState :: Closed
@@ -2176,8 +2190,11 @@ impl ComponentInstance {
21762190
21772191 /// Cancel a pending stream or future write from the host.
21782192 ///
2179- /// `rep` is the `TransmitState` rep for the stream or future.
2180- fn host_cancel_write ( & mut self , rep : u32 ) -> Result < ReturnCode > {
2193+ /// # Arguments
2194+ ///
2195+ /// * `rep` - The `TransmitState` rep for the stream or future.
2196+ /// * `kind` - Whether `rep` is for a stream or a future.
2197+ fn host_cancel_write ( & mut self , rep : u32 , kind : TransmitKind ) -> Result < ReturnCode > {
21812198 let transmit_id = TableId :: < TransmitState > :: new ( rep) ;
21822199 let transmit = self . get_mut ( transmit_id) ?;
21832200
@@ -2208,6 +2225,10 @@ impl ComponentInstance {
22082225
22092226 log:: trace!( "cancelled write {transmit_id:?}" ) ;
22102227
2228+ if let ( TransmitKind :: Future , ReturnCode :: Cancelled ( 0 ) ) = ( kind, code) {
2229+ transmit. may_close_writer = false ;
2230+ }
2231+
22112232 Ok ( code)
22122233 }
22132234
@@ -2259,6 +2280,10 @@ impl ComponentInstance {
22592280 . get_mut ( transmit_id)
22602281 . with_context ( || format ! ( "error closing writer {transmit_rep}" ) ) ?;
22612282
2283+ if !transmit. may_close_writer {
2284+ bail ! ( "cannot close future write end without first writing a value" )
2285+ }
2286+
22622287 transmit. writer_watcher = None ;
22632288
22642289 // Existing queued transmits must be updated with information for the impending writer closure
@@ -2652,6 +2677,7 @@ impl ComponentInstance {
26522677 let transmit_id = self . get ( transmit_handle) ?. state ;
26532678 log:: trace!( "guest_write {transmit_handle:?} (handle {handle}; state {transmit_id:?})" , ) ;
26542679 let transmit = self . get_mut ( transmit_id) ?;
2680+ transmit. may_close_writer = true ;
26552681 let new_state = if let ReadState :: Closed = & transmit. read {
26562682 ReadState :: Closed
26572683 } else {
@@ -3018,10 +3044,11 @@ impl ComponentInstance {
30183044 writer : u32 ,
30193045 _async_ : bool ,
30203046 ) -> Result < ReturnCode > {
3021- let ( rep, WaitableState :: Stream ( _, state) | WaitableState :: Future ( _, state) ) =
3022- self . state_table ( ty) . get_mut_by_index ( writer) ?
3023- else {
3024- bail ! ( "invalid stream or future handle" ) ;
3047+ let ( rep, state) = self . state_table ( ty) . get_mut_by_index ( writer) ?;
3048+ let ( state, kind) = match state {
3049+ WaitableState :: Stream ( _, state) => ( state, TransmitKind :: Stream ) ,
3050+ WaitableState :: Future ( _, state) => ( state, TransmitKind :: Future ) ,
3051+ _ => bail ! ( "invalid stream or future handle" ) ,
30253052 } ;
30263053 let id = TableId :: < TransmitHandle > :: new ( rep) ;
30273054 log:: trace!( "guest cancel write {id:?} (handle {writer})" ) ;
@@ -3037,7 +3064,7 @@ impl ComponentInstance {
30373064 }
30383065 }
30393066 let rep = self . get ( id) ?. state . rep ( ) ;
3040- self . host_cancel_write ( rep)
3067+ self . host_cancel_write ( rep, kind )
30413068 }
30423069
30433070 /// Cancel a pending read for the specified stream or future from the guest.
0 commit comments