11use std:: {
22 net:: SocketAddr ,
3- sync:: { Arc , LazyLock , Mutex } ,
3+ sync:: {
4+ Arc , LazyLock , Mutex ,
5+ atomic:: { AtomicBool , Ordering } ,
6+ } ,
47} ;
58
69use defguard_version:: {
710 DefguardComponent , Version ,
811 server:: { DefguardVersionLayer , grpc:: DefguardVersionInterceptor } ,
912} ;
10- use tokio:: sync:: mpsc;
13+ use tokio:: sync:: { mpsc, oneshot } ;
1114use tokio_stream:: wrappers:: UnboundedReceiverStream ;
1215use tonic:: { Request , Response , Status , transport:: Server } ;
1316
1417use crate :: {
1518 CommsChannel , LogsReceiver , MIN_CORE_VERSION , VERSION ,
19+ config:: EnvConfig ,
1620 error:: ApiError ,
1721 grpc:: Configuration ,
1822 proto:: { CertificateInfo , DerPayload , LogEntry , proxy_setup_server} ,
@@ -32,6 +36,7 @@ pub(crate) struct ProxySetupServer {
3236 key_pair : Arc < Mutex < Option < defguard_certs:: RcGenKeyPair > > > ,
3337 logs_rx : LogsReceiver ,
3438 current_session_token : Arc < Mutex < Option < String > > > ,
39+ adoption_expired : Arc < AtomicBool > ,
3540}
3641
3742impl Clone for ProxySetupServer {
@@ -40,6 +45,7 @@ impl Clone for ProxySetupServer {
4045 key_pair : Arc :: clone ( & self . key_pair ) ,
4146 logs_rx : Arc :: clone ( & self . logs_rx ) ,
4247 current_session_token : Arc :: clone ( & self . current_session_token ) ,
48+ adoption_expired : Arc :: clone ( & self . adoption_expired ) ,
4349 }
4450 }
4551}
@@ -50,6 +56,7 @@ impl ProxySetupServer {
5056 key_pair : Arc :: new ( Mutex :: new ( None ) ) ,
5157 logs_rx,
5258 current_session_token : Arc :: new ( Mutex :: new ( None ) ) ,
59+ adoption_expired : Arc :: new ( AtomicBool :: new ( false ) ) ,
5360 }
5461 }
5562
@@ -62,11 +69,29 @@ impl ProxySetupServer {
6269 pub ( crate ) async fn await_initial_setup (
6370 & self ,
6471 addr : SocketAddr ,
72+ config : & EnvConfig ,
6573 ) -> Result < Configuration , anyhow:: Error > {
66- info ! ( "gRPC waiting for setup connection from Core on {addr}" ) ;
74+ let adoption_timeout = config. adoption_timeout ( ) ;
75+ info ! (
76+ "gRPC waiting for setup connection from Core on {addr} for {} seconds" ,
77+ adoption_timeout. as_secs( )
78+ ) ;
6779
80+ let adoption_expired = Arc :: clone ( & self . adoption_expired ) ;
81+ let ( cancel_tx, cancel_rx) = oneshot:: channel :: < ( ) > ( ) ;
82+ tokio:: spawn ( async move {
83+ tokio:: select! {
84+ _ = tokio:: time:: sleep( adoption_timeout) => {
85+ adoption_expired. store( true , Ordering :: Relaxed ) ;
86+ error!(
87+ "Edge adoption expired and is now blocked. Restart the Edge to enable auto-adoption."
88+ ) ;
89+ }
90+ _ = cancel_rx => { }
91+ }
92+ } ) ;
6893 let own_version = Version :: parse ( VERSION ) ?;
69- debug ! ( "Proxy version: {}" , VERSION ) ;
94+ debug ! ( "Edge version: {}" , VERSION ) ;
7095
7196 let config_slot: Arc < tokio:: sync:: Mutex < Option < Configuration > > > =
7297 Arc :: new ( tokio:: sync:: Mutex :: new ( None ) ) ;
@@ -107,6 +132,7 @@ impl ProxySetupServer {
107132 ApiError :: Unexpected ( "No configuration received after setup" . into ( ) )
108133 } ) ?;
109134
135+ let _ = cancel_tx. send ( ( ) ) ;
110136 Ok ( configuration)
111137 }
112138
@@ -158,6 +184,11 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {
158184 #[ instrument( skip( self , request) ) ]
159185 async fn start ( & self , request : Request < ( ) > ) -> Result < Response < Self :: StartStream > , Status > {
160186 debug ! ( "Core initiated setup process, preparing to stream logs" ) ;
187+ if self . adoption_expired . load ( Ordering :: Relaxed ) {
188+ let error_message = "Edge adoption expired and is now blocked. Restart the Edge to enable auto-adoption." ;
189+ error ! ( "{error_message}" ) ;
190+ return Err ( Status :: failed_precondition ( error_message) ) ;
191+ }
161192 if self . is_setup_in_progress ( ) {
162193 error ! ( "Setup already in progress, rejecting new setup request" ) ;
163194 return Err ( Status :: resource_exhausted ( "Setup already in progress" ) ) ;
0 commit comments