@@ -20,7 +20,7 @@ use wasmtime::component::{
2020
2121use crate :: p3:: DEFAULT_BUFFER_CAPACITY ;
2222use crate :: p3:: bindings:: sockets:: types:: {
23- Duration , ErrorCode , HostTcpSocket , HostTcpSocketConcurrent , IpAddressFamily , IpSocketAddress ,
23+ Duration , ErrorCode , HostTcpSocket , HostTcpSocketWithStore , IpAddressFamily , IpSocketAddress ,
2424 TcpSocket ,
2525} ;
2626use crate :: p3:: sockets:: WasiSockets ;
@@ -192,6 +192,18 @@ impl<T> AccessorTask<T, WasiSockets, wasmtime::Result<()>> for ListenTask {
192192 }
193193}
194194
195+ struct ResultWriteTask {
196+ result : Result < ( ) , ErrorCode > ,
197+ result_tx : FutureWriter < Result < ( ) , ErrorCode > > ,
198+ }
199+
200+ impl < T > AccessorTask < T , WasiSockets , wasmtime:: Result < ( ) > > for ResultWriteTask {
201+ async fn run ( self , store : & Accessor < T , WasiSockets > ) -> wasmtime:: Result < ( ) > {
202+ self . result_tx . write ( store, self . result ) . await ;
203+ Ok ( ( ) )
204+ }
205+ }
206+
195207struct ReceiveTask {
196208 stream : Arc < TcpStream > ,
197209 data_tx : StreamWriter < Cursor < BytesMut > > ,
@@ -238,16 +250,22 @@ impl<T> AccessorTask<T, WasiSockets, wasmtime::Result<()>> for ReceiveTask {
238250 }
239251 }
240252 } ;
241- self . result_tx . write ( store, res) . await ;
242253 _ = self
243254 . stream
244255 . as_socketlike_view :: < std:: net:: TcpStream > ( )
245256 . shutdown ( Shutdown :: Read ) ;
257+
258+ // Write the result async from a separate task to ensure that all resources used by this
259+ // task are freed
260+ store. spawn ( ResultWriteTask {
261+ result : res,
262+ result_tx : self . result_tx ,
263+ } ) ;
246264 Ok ( ( ) )
247265 }
248266}
249267
250- impl HostTcpSocketConcurrent for WasiSockets {
268+ impl HostTcpSocketWithStore for WasiSockets {
251269 async fn bind < T > (
252270 store : & Accessor < T , Self > ,
253271 socket : Resource < TcpSocket > ,
@@ -283,7 +301,7 @@ impl HostTcpSocketConcurrent for WasiSockets {
283301 || !is_valid_remote_address ( remote_address)
284302 || !is_valid_address_family ( ip, socket. family )
285303 {
286- return Ok ( Err ( ErrorCode :: InvalidArgument ) ) ;
304+ return anyhow :: Ok ( Err ( ErrorCode :: InvalidArgument ) ) ;
287305 }
288306 match mem:: replace ( & mut socket. tcp_state , TcpState :: Connecting ) {
289307 TcpState :: Default ( sock) | TcpState :: Bound ( sock) => Ok ( Ok ( sock) ) ,
@@ -433,25 +451,28 @@ impl HostTcpSocketConcurrent for WasiSockets {
433451 let ( data_tx, data_rx) = instance
434452 . stream :: < _ , _ , BytesMut > ( & mut view)
435453 . context ( "failed to create stream" ) ?;
436- let ( result_tx, result_rx) = instance
437- . future ( || unreachable ! ( ) , & mut view)
438- . context ( "failed to create future" ) ?;
439454 let TcpSocket { tcp_state, .. } = get_socket_mut ( view. get ( ) . table , & socket) ?;
440455 match mem:: replace ( tcp_state, TcpState :: Closed ) {
441456 TcpState :: Connected ( stream) => {
442457 * tcp_state = TcpState :: Receiving ( Arc :: clone ( & stream) ) ;
458+ let ( result_tx, result_rx) = instance
459+ . future ( || unreachable ! ( ) , & mut view)
460+ . context ( "failed to create future" ) ?;
443461 view. spawn ( ReceiveTask {
444462 stream,
445463 data_tx,
446464 result_tx,
447465 } ) ;
466+ Ok ( ( data_rx. into ( ) , result_rx. into ( ) ) )
448467 }
449468 prev => {
450469 * tcp_state = prev;
451- _ = result_tx. write ( store, Err ( ErrorCode :: InvalidState ) ) ;
470+ let ( _, result_rx) = instance
471+ . future ( || Err ( ErrorCode :: InvalidState ) , & mut view)
472+ . context ( "failed to create future" ) ?;
473+ Ok ( ( data_rx. into ( ) , result_rx. into ( ) ) )
452474 }
453- } ;
454- Ok ( ( data_rx. into ( ) , result_rx. into ( ) ) )
475+ }
455476 } )
456477 }
457478}
0 commit comments