@@ -213,7 +213,6 @@ impl ClientConnectionSender {
213213pub struct ClientConnection {
214214 sender : Arc < ClientConnectionSender > ,
215215 pub replica_id : u64 ,
216- pub module : ModuleHost ,
217216 module_rx : watch:: Receiver < ModuleHost > ,
218217}
219218
@@ -480,7 +479,6 @@ impl ClientConnection {
480479 let this = Self {
481480 sender,
482481 replica_id,
483- module,
484482 module_rx,
485483 } ;
486484
@@ -495,13 +493,11 @@ impl ClientConnection {
495493 id : ClientActorId ,
496494 config : ClientConfig ,
497495 replica_id : u64 ,
498- mut module_rx : watch:: Receiver < ModuleHost > ,
496+ module_rx : watch:: Receiver < ModuleHost > ,
499497 ) -> Self {
500- let module = module_rx. borrow_and_update ( ) . clone ( ) ;
501498 Self {
502499 sender : Arc :: new ( ClientConnectionSender :: dummy ( id, config) ) ,
503500 replica_id,
504- module,
505501 module_rx,
506502 }
507503 }
@@ -510,6 +506,20 @@ impl ClientConnection {
510506 self . sender . clone ( )
511507 }
512508
509+ /// Get the [`ModuleHost`] for this connection.
510+ ///
511+ /// Note that modules can be hotswapped, in which case the returned handle
512+ /// becomes invalid (i.e. all calls on it will result in an error).
513+ /// Callers should thus drop the value as soon as they are done, and obtain
514+ /// a fresh one when needed.
515+ ///
516+ /// While this [`ClientConnection`] is active, [`Self::watch_module_host`]
517+ /// should be polled in the background, and the connection closed if and
518+ /// when it returns an error.
519+ pub fn module ( & self ) -> ModuleHost {
520+ self . module_rx . borrow ( ) . clone ( )
521+ }
522+
513523 #[ inline]
514524 pub fn handle_message (
515525 & self ,
@@ -519,13 +529,26 @@ impl ClientConnection {
519529 message_handlers:: handle ( self , message. into ( ) , timer)
520530 }
521531
532+ /// Waits until the [`ModuleHost`] of this [`ClientConnection`] instance
533+ /// exits, in which case `Err` containing [`NoSuchModule`] is returned.
534+ ///
535+ /// Should be polled while this [`ClientConnection`] is active, so as to be
536+ /// able to shut down the connection gracefully if and when the module
537+ /// exits.
538+ ///
539+ /// Note that this borrows `self` mutably, so may require cloning the
540+ /// [`ClientConnection`] instance. The module is shared, however, so all
541+ /// clones will observe a swapped module.
522542 pub async fn watch_module_host ( & mut self ) -> Result < ( ) , NoSuchModule > {
523- match self . module_rx . changed ( ) . await {
524- Ok ( ( ) ) => {
525- self . module = self . module_rx . borrow_and_update ( ) . clone ( ) ;
526- Ok ( ( ) )
543+ loop {
544+ // First check if the module exited between creating the client
545+ // connection and calling `watch_module_host`...
546+ if self . module_rx . changed ( ) . await . is_err ( ) {
547+ return Err ( NoSuchModule ) ;
527548 }
528- Err ( _) => Err ( NoSuchModule ) ,
549+ // ...then mark the current module as seen, so the next iteration
550+ // of the loop waits until the module changes or exits.
551+ self . module_rx . mark_unchanged ( ) ;
529552 }
530553 }
531554
@@ -544,7 +567,7 @@ impl ClientConnection {
544567 CallReducerFlags :: NoSuccessNotify => None ,
545568 } ;
546569
547- self . module
570+ self . module ( )
548571 . call_reducer (
549572 self . id . identity ,
550573 Some ( self . id . connection_id ) ,
@@ -563,9 +586,9 @@ impl ClientConnection {
563586 timer : Instant ,
564587 ) -> Result < Option < ExecutionMetrics > , DBError > {
565588 let me = self . clone ( ) ;
566- self . module
589+ self . module ( )
567590 . on_module_thread ( "subscribe_single" , move || {
568- me. module
591+ me. module ( )
569592 . subscriptions ( )
570593 . add_single_subscription ( me. sender , subscription, timer, None )
571594 } )
@@ -575,7 +598,7 @@ impl ClientConnection {
575598 pub async fn unsubscribe ( & self , request : Unsubscribe , timer : Instant ) -> Result < Option < ExecutionMetrics > , DBError > {
576599 let me = self . clone ( ) ;
577600 asyncify ( move || {
578- me. module
601+ me. module ( )
579602 . subscriptions ( )
580603 . remove_single_subscription ( me. sender , request, timer)
581604 } )
@@ -588,9 +611,9 @@ impl ClientConnection {
588611 timer : Instant ,
589612 ) -> Result < Option < ExecutionMetrics > , DBError > {
590613 let me = self . clone ( ) ;
591- self . module
614+ self . module ( )
592615 . on_module_thread ( "subscribe_multi" , move || {
593- me. module
616+ me. module ( )
594617 . subscriptions ( )
595618 . add_multi_subscription ( me. sender , request, timer, None )
596619 } )
@@ -603,9 +626,9 @@ impl ClientConnection {
603626 timer : Instant ,
604627 ) -> Result < Option < ExecutionMetrics > , DBError > {
605628 let me = self . clone ( ) ;
606- self . module
629+ self . module ( )
607630 . on_module_thread ( "unsubscribe_multi" , move || {
608- me. module
631+ me. module ( )
609632 . subscriptions ( )
610633 . remove_multi_subscription ( me. sender , request, timer)
611634 } )
@@ -615,7 +638,7 @@ impl ClientConnection {
615638 pub async fn subscribe ( & self , subscription : Subscribe , timer : Instant ) -> Result < ExecutionMetrics , DBError > {
616639 let me = self . clone ( ) ;
617640 asyncify ( move || {
618- me. module
641+ me. module ( )
619642 . subscriptions ( )
620643 . add_legacy_subscriber ( me. sender , subscription, timer, None )
621644 } )
@@ -628,7 +651,7 @@ impl ClientConnection {
628651 message_id : & [ u8 ] ,
629652 timer : Instant ,
630653 ) -> Result < ( ) , anyhow:: Error > {
631- self . module
654+ self . module ( )
632655 . one_off_query :: < JsonFormat > (
633656 self . id . identity ,
634657 query. to_owned ( ) ,
@@ -646,7 +669,7 @@ impl ClientConnection {
646669 message_id : & [ u8 ] ,
647670 timer : Instant ,
648671 ) -> Result < ( ) , anyhow:: Error > {
649- self . module
672+ self . module ( )
650673 . one_off_query :: < BsatnFormat > (
651674 self . id . identity ,
652675 query. to_owned ( ) ,
@@ -659,6 +682,6 @@ impl ClientConnection {
659682 }
660683
661684 pub async fn disconnect ( self ) {
662- self . module . disconnect_client ( self . id ) . await
685+ self . module ( ) . disconnect_client ( self . id ) . await
663686 }
664687}
0 commit comments