@@ -4,7 +4,7 @@ use std::{
44 net:: SocketAddr ,
55 sync:: {
66 atomic:: { AtomicBool , AtomicU64 , Ordering } ,
7- Arc , LazyLock , Mutex ,
7+ Arc , Mutex ,
88 } ,
99} ;
1010
@@ -25,25 +25,13 @@ use tracing::Instrument;
2525use crate :: {
2626 error:: ApiError ,
2727 http:: GRPC_SERVER_RESTART_CHANNEL ,
28- proto:: {
29- core_request, core_response, proxy_server, proxy_setup_request, proxy_setup_response,
30- CertResponse , CoreRequest , CoreResponse , CsrRequest , DeviceInfo , Done , ProxySetupRequest ,
31- ProxySetupResponse ,
32- } ,
33- CommsChannel , MIN_CORE_VERSION , VERSION ,
28+ proto:: { core_request, core_response, proxy_server, CoreRequest , CoreResponse , DeviceInfo } ,
29+ MIN_CORE_VERSION , VERSION ,
3430} ;
3531
3632// connected clients
3733type ClientMap = HashMap < SocketAddr , mpsc:: UnboundedSender < Result < CoreRequest , Status > > > ;
3834
39- static SETUP_CHANNEL : LazyLock < CommsChannel < Option < Configuration > > > = LazyLock :: new ( || {
40- let ( tx, rx) = mpsc:: channel ( 10 ) ;
41- (
42- Arc :: new ( tokio:: sync:: Mutex :: new ( tx) ) ,
43- Arc :: new ( tokio:: sync:: Mutex :: new ( rx) ) ,
44- )
45- } ) ;
46-
4735#[ derive( Debug , Clone , Default ) ]
4836pub ( crate ) struct Configuration {
4937 pub ( crate ) grpc_key_pem : String ,
@@ -83,58 +71,6 @@ impl ProxyServer {
8371 Ok ( ( ) )
8472 }
8573
86- pub ( crate ) async fn await_setup (
87- & self ,
88- addr : SocketAddr ,
89- ) -> Result < Configuration , anyhow:: Error > {
90- info ! ( "gRPC waiting for setup connection from Core on {addr}" ) ;
91- let server_builder = Server :: builder ( ) ;
92- let mut server_config = None ;
93-
94- let own_version = Version :: parse ( VERSION ) ?;
95-
96- server_builder
97- . layer ( tonic:: service:: InterceptorLayer :: new (
98- DefguardVersionInterceptor :: new (
99- own_version. clone ( ) ,
100- DefguardComponent :: Core ,
101- MIN_CORE_VERSION ,
102- false ,
103- ) ,
104- ) )
105- . layer ( DefguardVersionLayer :: new ( own_version) )
106- . add_service ( proxy_server:: ProxyServer :: new ( self . clone ( ) ) )
107- . serve_with_shutdown ( addr, async {
108- let config = SETUP_CHANNEL . 1 . lock ( ) . await . recv ( ) . await ;
109- if let Some ( cfg) = config {
110- debug ! ( "Received the passed Proxy configuration" ) ;
111- server_config = cfg;
112- } else {
113- error ! ( "Setup communication channel closed unexpectedly" ) ;
114- }
115- } )
116- . await
117- . map_err ( |err| {
118- error ! ( "gRPC server error during setup: {err}" ) ;
119- ApiError :: Unexpected ( "gRPC server error during setup" . into ( ) )
120- } ) ?;
121-
122- debug ! ( "gRPC setup server on {addr} has shutdown after completing setup" ) ;
123-
124- Ok ( server_config. map_or_else (
125- || {
126- error ! ( "No server configuration received after setup completion" ) ;
127- Err ( ApiError :: Unexpected (
128- "No server configuration received after setup" . into ( ) ,
129- ) )
130- } ,
131- |cfg| {
132- debug ! ( "Returning received server configuration" ) ;
133- Ok ( cfg)
134- } ,
135- ) ?)
136- }
137-
13874 pub ( crate ) fn configure ( & self , config : Configuration ) {
13975 let mut lock = self . config . lock ( ) . unwrap ( ) ;
14076 * lock = Some ( config) ;
@@ -241,231 +177,6 @@ impl Clone for ProxyServer {
241177#[ tonic:: async_trait]
242178impl proxy_server:: Proxy for ProxyServer {
243179 type BidiStream = UnboundedReceiverStream < Result < CoreRequest , Status > > ;
244- type SetupStream = UnboundedReceiverStream < Result < ProxySetupRequest , Status > > ;
245-
246- async fn setup (
247- & self ,
248- request : Request < tonic:: Streaming < ProxySetupResponse > > ,
249- ) -> Result < Response < Self :: SetupStream > , Status > {
250- if self . setup_in_progress . swap ( true , Ordering :: SeqCst ) {
251- info ! ( "Proxy setup already in progress, rejecting concurrent setup attempt" ) ;
252- return Err ( Status :: already_exists ( "Proxy setup is already in progress" ) ) ;
253- }
254-
255- info ! ( "Starting proxy setup handshake" ) ;
256-
257- let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
258- let tls_configured = self . setup_completed ( ) ;
259- let current_configuration = self . get_configuration ( ) ;
260- let setup_in_progress = Arc :: clone ( & self . setup_in_progress ) ;
261-
262- tokio:: spawn (
263- async move {
264- let mut stream = request. into_inner ( ) ;
265- let mut setup_successful = false ;
266- let initial_info = match stream. message ( ) . await {
267- Ok ( Some ( ProxySetupResponse {
268- payload : Some ( proxy_setup_response:: Payload :: InitialSetupInfo ( info) ) ,
269- } ) ) => {
270- info ! ( "Received initial setup information from Defguard Core" ) ;
271- debug ! ( "Initial setup info: {:?}" , info) ;
272- info
273- }
274- Ok ( Some ( ProxySetupResponse {
275- payload : Some ( proxy_setup_response:: Payload :: Done ( Done { } ) ) ,
276- } ) ) => {
277- info ! ( "Received setup termination message from Defguard Core, skipping setup phase" ) ;
278- setup_in_progress. store ( false , Ordering :: SeqCst ) ;
279- return ;
280- }
281- Ok ( Some ( msg) ) => {
282- error ! ( "Unexpected payload type in initial Proxy setup message: {:?}" , msg) ;
283- let _ = tx. send ( Err ( Status :: invalid_argument (
284- "Unexpected payload type in initial Proxy setup message" ,
285- ) ) ) ;
286- setup_in_progress. store ( false , Ordering :: SeqCst ) ;
287- return ;
288- }
289- Ok ( None ) => {
290- error ! ( "No initial Proxy setup message received from Defguard Core" ) ;
291- let _ = tx. send ( Err ( Status :: aborted (
292- "No initial Proxy setup message received from Defguard Core" ,
293- ) ) ) ;
294- setup_in_progress. store ( false , Ordering :: SeqCst ) ;
295- return ;
296- }
297- Err ( err) => {
298- error ! ( "Error receiving initial Proxy setup message from Defguard Core: {err}" ) ;
299- let _ = tx. send ( Err ( Status :: internal ( format ! (
300- "Error receiving initial message: {err}"
301- ) ) ) ) ;
302- setup_in_progress. store ( false , Ordering :: SeqCst ) ;
303- return ;
304- }
305- } ;
306-
307- if tls_configured {
308- info ! ( "Certificates already generated, skipping CSR generation" ) ;
309- if let Err ( err) = tx. send ( Ok ( ProxySetupRequest {
310- payload : Some ( proxy_setup_request:: Payload :: Done ( Done { } ) ) ,
311- } ) ) {
312- error ! ( "Failed to send Done message: {err}" ) ;
313- }
314- return ;
315- }
316-
317- info ! ( "Generating new key pair and CSR for certificate issuance" ) ;
318- let key_pair = match defguard_certs:: generate_key_pair ( ) {
319- Ok ( kp) => kp,
320- Err ( err) => {
321- error ! ( "Failed to generate key pair: {err}" ) ;
322- let _ = tx. send ( Err ( Status :: internal ( format ! (
323- "Failed to generate key pair: {err}"
324- ) ) ) ) ;
325- setup_in_progress. store ( false , Ordering :: SeqCst ) ;
326- return ;
327- }
328- } ;
329-
330- let subject_alt_names = vec ! [ initial_info. cert_hostname] ;
331-
332- let csr = match defguard_certs:: Csr :: new (
333- & key_pair,
334- & subject_alt_names,
335- vec ! [
336- // TODO: Change it?
337- ( defguard_certs:: DnType :: CommonName , "Defguard Proxy" ) ,
338- ( defguard_certs:: DnType :: OrganizationName , "Defguard" ) ,
339- ] ,
340- ) {
341- Ok ( csr) => csr,
342- Err ( err) => {
343- error ! ( "Failed to generate CSR: {err}" ) ;
344- let _ = tx. send ( Err ( Status :: internal ( format ! (
345- "Failed to generate CSR: {err}"
346- ) ) ) ) ;
347- setup_in_progress. store ( false , Ordering :: SeqCst ) ;
348- return ;
349- }
350- } ;
351-
352- let csr_der = csr. to_der ( ) ;
353- let csr_request = CsrRequest {
354- csr_der : csr_der. to_vec ( ) ,
355- } ;
356-
357- if let Err ( err) = tx. send ( Ok ( ProxySetupRequest {
358- payload : Some ( proxy_setup_request:: Payload :: CsrRequest ( csr_request) ) ,
359- } ) ) {
360- error ! ( "Failed to send CsrRequest: {err}" ) ;
361- setup_in_progress. store ( false , Ordering :: SeqCst ) ;
362- return ;
363- }
364-
365- debug ! ( "Sent CSR request to Core" ) ;
366-
367- let mut configuration = current_configuration;
368-
369- loop {
370- match stream. message ( ) . await {
371- Ok ( Some ( response) ) => match response. payload {
372- Some ( proxy_setup_response:: Payload :: CertResponse ( CertResponse {
373- cert_der,
374- } ) ) => {
375- debug ! ( "Received CertResponse from Defguard Core" ) ;
376- let grpc_cert_pem = match defguard_certs:: der_to_pem (
377- & cert_der,
378- defguard_certs:: PemLabel :: Certificate ,
379- ) {
380- Ok ( pem) => pem,
381- Err ( err) => {
382- error ! ( "Failed to convert certificate DER to PEM: {err}" ) ;
383- let _ = tx. send ( Err ( Status :: internal ( format ! (
384- "Failed to convert certificate DER to PEM: {err}"
385- ) ) ) ) ;
386- setup_in_progress. store ( false , Ordering :: SeqCst ) ;
387- return ;
388- }
389- } ;
390-
391- configuration = Some ( Configuration {
392- grpc_key_pem : key_pair. serialize_pem ( ) ,
393- grpc_cert_pem,
394- } ) ;
395-
396- if let Err ( err) = tx. send ( Ok ( ProxySetupRequest {
397- payload : Some ( proxy_setup_request:: Payload :: Done ( Done { } ) ) ,
398- } ) ) {
399- error ! ( "Failed to send Done message: {err}" ) ;
400- // Can't send error since tx already failed
401- setup_in_progress. store ( false , Ordering :: SeqCst ) ;
402- return ;
403- }
404-
405- debug ! ( "Sent Done message to Defguard Core" ) ;
406- }
407- Some ( proxy_setup_response:: Payload :: Done ( Done { } ) ) => {
408- debug ! ( "Received setup termination message from Defguard Core" ) ;
409- let lock = SETUP_CHANNEL . 0 . lock ( ) . await ;
410- debug ! ( "Passing Proxy configuration to the gRPC server" ) ;
411- if let Some ( configuration) = configuration. take ( ) {
412- if let Err ( err) = lock. send ( Some ( configuration) ) . await {
413- error ! ( "Failed to pass the received Proxy configuration to the gRPC server: {err:?}" ) ;
414- } else {
415- debug ! ( "Passed Proxy configuration to the gRPC server" ) ;
416- setup_successful = true ;
417- }
418- } else {
419- error ! (
420- "No configuration available to pass to the gRPC server on setup completion"
421- ) ;
422- }
423-
424- info ! ( "Setup handshake completed successfully" ) ;
425- break ;
426- }
427- _ => {
428- error ! (
429- "Unexpected payload type while waiting for setup message: {:?}" ,
430- response. payload
431- ) ;
432- let _ = tx. send ( Err ( Status :: invalid_argument (
433- "Unexpected payload type while waiting for setup message" ,
434- ) ) ) ;
435- setup_in_progress. store ( false , Ordering :: SeqCst ) ;
436- return ;
437- }
438- } ,
439- Ok ( None ) => {
440- error ! ( "gRPC setup stream has been closed unexpectedly" ) ;
441- let _ = tx. send ( Err ( Status :: aborted (
442- "gRPC stream has been closed unexpectedly" ,
443- ) ) ) ;
444- setup_in_progress. store ( false , Ordering :: SeqCst ) ;
445- return ;
446- }
447- Err ( err) => {
448- error ! ( "gRPC setup client error while waiting for CertResponse: {err}" ) ;
449- let _ =
450- tx. send ( Err ( Status :: internal ( format ! ( "gRPC client error: {err}" ) ) ) ) ;
451- setup_in_progress. store ( false , Ordering :: SeqCst ) ;
452- return ;
453- }
454- }
455- }
456-
457- setup_in_progress. store ( false , Ordering :: SeqCst ) ;
458- if setup_successful {
459- info ! ( "Proxy setup completed successfully" ) ;
460- } else {
461- warn ! ( "Proxy setup did not complete successfully" ) ;
462- }
463- }
464- . instrument ( tracing:: Span :: current ( ) ) ,
465- ) ;
466-
467- Ok ( Response :: new ( UnboundedReceiverStream :: new ( rx) ) )
468- }
469180
470181 /// Handle bidirectional communication with Defguard core.
471182 #[ instrument( name = "bidirectional_communication" , level = "info" , skip( self ) ) ]
0 commit comments