@@ -12,7 +12,9 @@ use oo7::{
1212 Key , Secret ,
1313} ;
1414use tokio:: sync:: { Mutex , RwLock } ;
15+ use tokio_stream:: StreamExt ;
1516use zbus:: {
17+ names:: { OwnedUniqueName , UniqueName } ,
1618 object_server:: SignalEmitter ,
1719 proxy:: Defaults ,
1820 zvariant:: { ObjectPath , OwnedObjectPath , OwnedValue , Value } ,
@@ -46,6 +48,7 @@ impl Service {
4648 & self ,
4749 algorithm : Algorithm ,
4850 input : Value < ' _ > ,
51+ #[ zbus( header) ] header : zbus:: message:: Header < ' _ > ,
4952 #[ zbus( object_server) ] object_server : & zbus:: ObjectServer ,
5053 ) -> Result < ( OwnedValue , OwnedObjectPath ) , ServiceError > {
5154 let ( public_key, aes_key) = match algorithm {
@@ -68,7 +71,13 @@ impl Service {
6871 }
6972 } ;
7073
71- let session = Session :: new ( aes_key. map ( Arc :: new) , self . clone ( ) ) . await ;
74+ let sender = header
75+ . sender ( )
76+ . ok_or_else ( || custom_service_error ( "Failed to get sender from header." ) ) ?;
77+
78+ tracing:: info!( "Client {} connected" , sender) ;
79+
80+ let session = Session :: new ( aes_key. map ( Arc :: new) , self . clone ( ) , sender. to_owned ( ) ) . await ;
7281 let path = session. path ( ) . clone ( ) ;
7382
7483 self . sessions
@@ -335,6 +344,40 @@ impl Service {
335344 . at ( collection. path ( ) . clone ( ) , collection)
336345 . await ?;
337346
347+ let service = service. clone ( ) ;
348+ tokio:: spawn ( async move { service. on_client_disconnect ( ) . await } ) ;
349+ Ok ( ( ) )
350+ }
351+
352+ async fn on_client_disconnect ( & self ) -> zbus:: Result < ( ) > {
353+ let rule = zbus:: MatchRule :: builder ( )
354+ . msg_type ( zbus:: message:: Type :: Signal )
355+ . sender ( "org.freedesktop.DBus" ) ?
356+ . interface ( "org.freedesktop.DBus" ) ?
357+ . member ( "NameOwnerChanged" ) ?
358+ . arg ( 2 , "" ) ?
359+ . build ( ) ;
360+ let mut stream = zbus:: MessageStream :: for_match_rule ( rule, & self . connection , None ) . await ?;
361+ while let Some ( message) = stream. try_next ( ) . await ? {
362+ let Ok ( ( _name, old_owner, new_owner) ) =
363+ message
364+ . body ( )
365+ . deserialize :: < ( String , OwnedUniqueName , OwnedUniqueName ) > ( )
366+ else {
367+ continue ;
368+ } ;
369+ assert_eq ! ( new_owner, "" ) ; // We enforce that in the matching rule
370+ if let Some ( session) = self . session_from_sender ( old_owner. as_ref ( ) ) . await {
371+ match session. close ( ) . await {
372+ Ok ( _) => tracing:: info!(
373+ "Client {} disconnected. Session: {} closed." ,
374+ old_owner,
375+ session. path( )
376+ ) ,
377+ Err ( err) => tracing:: error!( "Failed to close session: {}" , err) ,
378+ }
379+ }
380+ }
338381 Ok ( ( ) )
339382 }
340383
@@ -423,6 +466,18 @@ impl Service {
423466 n_sessions
424467 }
425468
469+ async fn session_from_sender < ' a > ( & self , sender : UniqueName < ' a > ) -> Option < Session > {
470+ let sessions = self . sessions . lock ( ) . await ;
471+
472+ for session in sessions. values ( ) {
473+ if session. sender ( ) == & sender {
474+ return Some ( session. clone ( ) ) ;
475+ }
476+ }
477+
478+ None
479+ }
480+
426481 pub async fn session ( & self , path : & OwnedObjectPath ) -> Option < Session > {
427482 self . sessions . lock ( ) . await . get ( path) . cloned ( )
428483 }
0 commit comments