11use std:: {
22 any:: Any ,
3+ cell:: RefCell ,
34 collections:: HashMap ,
45 rc:: Rc ,
56 sync:: {
@@ -20,7 +21,6 @@ use futures::{
2021 io:: BufReader ,
2122 select_biased,
2223} ;
23- use parking_lot:: Mutex ;
2424use serde:: { Deserialize , Serialize , de:: DeserializeOwned } ;
2525use serde_json:: value:: RawValue ;
2626
@@ -29,7 +29,7 @@ use crate::{Error, StreamReceiver};
2929
3030pub struct RpcConnection < Local : Side , Remote : Side > {
3131 outgoing_tx : UnboundedSender < OutgoingMessage < Local , Remote > > ,
32- pending_responses : Arc < Mutex < HashMap < i32 , PendingResponse > > > ,
32+ pending_responses : Rc < RefCell < HashMap < i32 , PendingResponse > > > ,
3333 next_id : AtomicI32 ,
3434 broadcast : StreamBroadcast ,
3535}
5757 let ( incoming_tx, incoming_rx) = mpsc:: unbounded ( ) ;
5858 let ( outgoing_tx, outgoing_rx) = mpsc:: unbounded ( ) ;
5959
60- let pending_responses = Arc :: new ( Mutex :: new ( HashMap :: default ( ) ) ) ;
60+ let pending_responses = Rc :: new ( RefCell :: new ( HashMap :: default ( ) ) ) ;
6161 let ( broadcast_tx, broadcast) = StreamBroadcast :: new ( ) ;
6262
6363 let io_task = {
7272 broadcast_tx,
7373 )
7474 . await ;
75- pending_responses. lock ( ) . clear ( ) ;
75+ pending_responses. borrow_mut ( ) . clear ( ) ;
7676 result
7777 }
7878 } ;
@@ -121,7 +121,7 @@ where
121121 ) -> impl Future < Output = Result < Out , Error > > {
122122 let ( tx, rx) = oneshot:: channel ( ) ;
123123 let id = self . next_id . fetch_add ( 1 , Ordering :: SeqCst ) ;
124- self . pending_responses . lock ( ) . insert (
124+ self . pending_responses . borrow_mut ( ) . insert (
125125 id,
126126 PendingResponse {
127127 deserialize : |value| {
@@ -144,7 +144,7 @@ where
144144 } )
145145 . is_err ( )
146146 {
147- self . pending_responses . lock ( ) . remove ( & id) ;
147+ self . pending_responses . borrow_mut ( ) . remove ( & id) ;
148148 }
149149 async move {
150150 let result = rx
@@ -162,7 +162,7 @@ where
162162 mut outgoing_rx : UnboundedReceiver < OutgoingMessage < Local , Remote > > ,
163163 mut outgoing_bytes : impl Unpin + AsyncWrite ,
164164 incoming_bytes : impl Unpin + AsyncRead ,
165- pending_responses : Arc < Mutex < HashMap < i32 , PendingResponse > > > ,
165+ pending_responses : Rc < RefCell < HashMap < i32 , PendingResponse > > > ,
166166 broadcast : StreamSender ,
167167 ) -> Result < ( ) > {
168168 // TODO: Create nicer abstraction for broadcast
@@ -213,7 +213,7 @@ where
213213 broadcast. outgoing( & error_response) ;
214214 }
215215 }
216- } else if let Some ( pending_response) = pending_responses. lock ( ) . remove( & id) {
216+ } else if let Some ( pending_response) = pending_responses. borrow_mut ( ) . remove( & id) {
217217 // Response
218218 if let Some ( result_value) = message. result {
219219 broadcast. incoming_response( id, Ok ( Some ( result_value) ) ) ;
0 commit comments