11use anyhow:: Context as _;
22use async_executor:: { LocalExecutor , Task } ;
33use async_net:: { SocketAddr , TcpListener , TcpStream } ;
4+
45use futures_lite:: { future, AsyncWriteExt } ;
56use puffin:: { FrameSinkId , GlobalProfiler , ScopeCollection } ;
67use std:: {
7- cell:: RefCell ,
88 rc:: Rc ,
99 sync:: {
1010 atomic:: { AtomicUsize , Ordering } ,
1111 Arc ,
1212 } ,
1313} ;
14+ use unsend:: lock:: RwLock ;
1415
1516/// Maximum size of the backlog of packets to send to a client if they aren't reading fast enough.
1617const MAX_FRAMES_IN_QUEUE : usize = 30 ;
@@ -267,7 +268,7 @@ impl Server {
267268 ) -> anyhow:: Result < ( ) > {
268269 let executor = Rc :: new ( LocalExecutor :: new ( ) ) ;
269270
270- let clients = Rc :: new ( RefCell :: new ( Vec :: new ( ) ) ) ;
271+ let clients = Rc :: new ( RwLock :: new ( Vec :: new ( ) ) ) ;
271272 let clients_cloned = clients. clone ( ) ;
272273 let num_clients_cloned = num_clients. clone ( ) ;
273274
@@ -304,6 +305,7 @@ impl Server {
304305 log:: warn!( "puffin server failure: {}" , err) ;
305306 }
306307 }
308+ log:: trace!( "End to Wait frame to send" ) ;
307309 } ) ;
308310 //.context("Couldn't spawn ps-send task")?;
309311
@@ -358,7 +360,7 @@ impl Drop for Client {
358360struct PuffinServerConnection < ' a > {
359361 executor : Rc < LocalExecutor < ' a > > ,
360362 tcp_listener : TcpListener ,
361- clients : Rc < RefCell < Vec < Client > > > ,
363+ clients : Rc < RwLock < Vec < Client > > > ,
362364 num_clients : Arc < AtomicUsize > ,
363365}
364366
@@ -382,14 +384,14 @@ impl<'a> PuffinServerConnection<'a> {
382384 // Send all scopes when new client connects.
383385 // TODO: send all previous scopes at connection, not on regular send
384386 //self.send_all_scopes = true;
385- self . clients . borrow_mut ( ) . push ( Client {
387+ self . clients . write ( ) . await . push ( Client {
386388 client_addr,
387389 packet_tx : Some ( packet_tx) ,
388390 join_handle : Some ( join_handle) ,
389391 send_all_scopes : true ,
390392 } ) ;
391393 self . num_clients
392- . store ( self . clients . borrow ( ) . len ( ) , Ordering :: SeqCst ) ;
394+ . store ( self . clients . read ( ) . await . len ( ) , Ordering :: SeqCst ) ;
393395 }
394396 Err ( e) if e. kind ( ) == std:: io:: ErrorKind :: WouldBlock => {
395397 break ; // Nothing to do for now.
@@ -405,14 +407,14 @@ impl<'a> PuffinServerConnection<'a> {
405407
406408/// streams to client puffin profiler data.
407409struct PuffinServerSend {
408- clients : Rc < RefCell < Vec < Client > > > ,
410+ clients : Rc < RwLock < Vec < Client > > > ,
409411 num_clients : Arc < AtomicUsize > ,
410412 scope_collection : ScopeCollection ,
411413}
412414
413415impl PuffinServerSend {
414416 pub async fn send ( & mut self , frame : & puffin:: FrameData ) -> anyhow:: Result < ( ) > {
415- if self . clients . borrow ( ) . is_empty ( ) {
417+ if self . clients . read ( ) . await . is_empty ( ) {
416418 return Ok ( ( ) ) ;
417419 }
418420 puffin:: profile_function!( ) ;
@@ -443,9 +445,8 @@ impl PuffinServerSend {
443445 let packet_all_scopes: Packet = packet_all_scopes. into ( ) ;
444446
445447 // Send frame to clients, remove disconnected clients and update num_clients var
446- let clients = self . clients . borrow ( ) ;
447448 let mut idx_to_remove = Vec :: new ( ) ;
448- for ( idx, client) in clients. iter ( ) . enumerate ( ) {
449+ for ( idx, client) in self . clients . read ( ) . await . iter ( ) . enumerate ( ) {
449450 let packet = if client. send_all_scopes {
450451 packet_all_scopes. clone ( )
451452 } else {
@@ -456,7 +457,7 @@ impl PuffinServerSend {
456457 }
457458 }
458459
459- let mut clients = self . clients . borrow_mut ( ) ;
460+ let mut clients = self . clients . write ( ) . await ;
460461 idx_to_remove. iter ( ) . rev ( ) . for_each ( |idx| {
461462 clients. remove ( * idx) ;
462463 } ) ;
0 commit comments