1- use std:: sync:: { Arc , Mutex } ;
1+ use std:: sync:: {
2+ Arc , Mutex ,
3+ atomic:: { AtomicBool , Ordering } ,
4+ } ;
25
36use defguard_version:: { Version , server:: DefguardVersionLayer } ;
47use tokio:: {
@@ -67,6 +70,7 @@ pub struct GatewaySetupServer {
6770 current_session_token : Arc < Mutex < Option < String > > > ,
6871 setup_tx : Arc < tokio:: sync:: Mutex < Option < oneshot:: Sender < TlsConfig > > > > ,
6972 setup_rx : Arc < tokio:: sync:: Mutex < oneshot:: Receiver < TlsConfig > > > ,
73+ adoption_expired : Arc < AtomicBool > ,
7074}
7175
7276impl Clone for GatewaySetupServer {
@@ -77,6 +81,7 @@ impl Clone for GatewaySetupServer {
7781 current_session_token : Arc :: clone ( & self . current_session_token ) ,
7882 setup_tx : Arc :: clone ( & self . setup_tx ) ,
7983 setup_rx : Arc :: clone ( & self . setup_rx ) ,
84+ adoption_expired : Arc :: clone ( & self . adoption_expired ) ,
8085 }
8186 }
8287}
@@ -91,6 +96,7 @@ impl GatewaySetupServer {
9196 current_session_token : Arc :: new ( Mutex :: new ( None ) ) ,
9297 setup_tx : Arc :: new ( tokio:: sync:: Mutex :: new ( Some ( setup_tx) ) ) ,
9398 setup_rx : Arc :: new ( tokio:: sync:: Mutex :: new ( setup_rx) ) ,
99+ adoption_expired : Arc :: new ( AtomicBool :: new ( false ) ) ,
94100 }
95101 }
96102
@@ -100,7 +106,25 @@ impl GatewaySetupServer {
100106 let setup_rx = Arc :: clone ( & self . setup_rx ) ;
101107
102108 let addr = config. grpc_socket ( ) ;
103- info ! ( "Starting Gateway setup server on {addr} and awaiting configuration from Core" ) ;
109+ let adoption_timeout = config. adoption_timeout ( ) ;
110+ info ! (
111+ "Starting Gateway setup server on {addr} and awaiting configuration from Core for {} min" ,
112+ adoption_timeout. as_secs( ) / 60
113+ ) ;
114+
115+ let adoption_expired = Arc :: clone ( & self . adoption_expired ) ;
116+ let ( cancel_tx, cancel_rx) = oneshot:: channel :: < ( ) > ( ) ;
117+ tokio:: spawn ( async move {
118+ tokio:: select! {
119+ _ = tokio:: time:: sleep( adoption_timeout) => {
120+ adoption_expired. store( true , Ordering :: Relaxed ) ;
121+ error!(
122+ "Gateway adoption expired and is now blocked. Restart the Gateway to enable auto-adoption."
123+ ) ;
124+ }
125+ _ = cancel_rx => { }
126+ }
127+ } ) ;
104128
105129 server_builder
106130 . add_service (
@@ -122,6 +146,11 @@ impl GatewaySetupServer {
122146 } )
123147 . await ?;
124148
149+ // Skip blocking Gateway adoption if adoption was already done
150+ if server_config. is_some ( ) {
151+ let _ = cancel_tx. send ( ( ) ) ;
152+ }
153+
125154 server_config. ok_or_else ( || {
126155 GatewayError :: SetupError ( "Failed to receive setup configuration from Core" . into ( ) )
127156 } )
@@ -175,6 +204,11 @@ impl gateway_setup_server::GatewaySetup for GatewaySetupServer {
175204 #[ instrument( skip( self , request) ) ]
176205 async fn start ( & self , request : Request < ( ) > ) -> Result < Response < Self :: StartStream > , Status > {
177206 debug ! ( "Core initiated setup process, preparing to stream logs" ) ;
207+ if self . adoption_expired . load ( Ordering :: Relaxed ) {
208+ let error_message = "Gateway adoption expired and is now blocked. Restart the Gateway to enable auto-adoption." ;
209+ error ! ( "{error_message}" ) ;
210+ return Err ( Status :: failed_precondition ( error_message) ) ;
211+ }
178212 if self . is_setup_in_progress ( ) {
179213 error ! ( "Setup already in progress, rejecting new setup request" ) ;
180214 return Err ( Status :: resource_exhausted ( "Setup already in progress" ) ) ;
0 commit comments