Skip to content

Commit 872b3a0

Browse files
Purge RPC handling (#250)
* wip reset handler stub * remove cert files on reset * go into setup mode after reset * rename reset -> purge * update protos * cargo fmt
1 parent a26a19f commit 872b3a0

File tree

4 files changed

+118
-28
lines changed

4 files changed

+118
-28
lines changed

proto

src/grpc.rs

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use std::{
22
any::Any,
33
collections::HashMap,
4+
future::Future,
45
net::SocketAddr,
6+
path::PathBuf,
57
sync::{
68
atomic::{AtomicBool, AtomicU64, Ordering},
79
Arc, Mutex, RwLock,
@@ -14,7 +16,7 @@ use defguard_version::{
1416
server::{grpc::DefguardVersionInterceptor, DefguardVersionLayer},
1517
ComponentInfo, DefguardComponent, Version,
1618
};
17-
use tokio::sync::{mpsc, oneshot};
19+
use tokio::sync::{broadcast, mpsc, oneshot};
1820
use tokio_stream::wrappers::UnboundedReceiverStream;
1921
use tonic::{
2022
transport::{Identity, Server, ServerTlsConfig},
@@ -25,6 +27,7 @@ use tracing::Instrument;
2527

2628
use crate::{
2729
error::ApiError,
30+
http::{GRPC_CERT_NAME, GRPC_KEY_NAME},
2831
proto::{core_request, core_response, proxy_server, CoreRequest, CoreResponse, DeviceInfo},
2932
MIN_CORE_VERSION, VERSION,
3033
};
@@ -46,12 +49,18 @@ pub(crate) struct ProxyServer {
4649
pub(crate) core_version: Arc<Mutex<Option<Version>>>,
4750
config: Arc<Mutex<Option<Configuration>>>,
4851
cookie_key: Arc<RwLock<Option<Key>>>,
52+
cert_dir: PathBuf,
53+
reset_tx: broadcast::Sender<()>,
4954
}
5055

5156
impl ProxyServer {
5257
#[must_use]
5358
/// Create new `ProxyServer`.
54-
pub(crate) fn new(cookie_key: Arc<RwLock<Option<Key>>>) -> Self {
59+
pub(crate) fn new(
60+
cookie_key: Arc<RwLock<Option<Key>>>,
61+
cert_dir: PathBuf,
62+
reset_tx: broadcast::Sender<()>,
63+
) -> Self {
5564
Self {
5665
cookie_key,
5766
current_id: Arc::new(AtomicU64::new(1)),
@@ -60,6 +69,8 @@ impl ProxyServer {
6069
connected: Arc::new(AtomicBool::new(false)),
6170
core_version: Arc::new(Mutex::new(None)),
6271
config: Arc::new(Mutex::new(None)),
72+
cert_dir,
73+
reset_tx,
6374
}
6475
}
6576

@@ -79,7 +90,10 @@ impl ProxyServer {
7990
lock.clone()
8091
}
8192

82-
pub(crate) async fn run(self, addr: SocketAddr) -> Result<(), anyhow::Error> {
93+
pub(crate) async fn run<F>(self, addr: SocketAddr, shutdown: F) -> Result<(), anyhow::Error>
94+
where
95+
F: Future<Output = ()> + Send + 'static,
96+
{
8397
info!("Starting gRPC server on {addr}");
8498
let config = self.get_configuration();
8599
let (grpc_cert, grpc_key) = if let Some(cfg) = config {
@@ -107,7 +121,7 @@ impl ProxyServer {
107121

108122
builder
109123
.add_service(versioned_service)
110-
.serve(addr)
124+
.serve_with_shutdown(addr, shutdown)
111125
.await
112126
.map_err(|err| {
113127
error!("gRPC server error: {err}");
@@ -176,6 +190,8 @@ impl Clone for ProxyServer {
176190
core_version: Arc::clone(&self.core_version),
177191
cookie_key: Arc::clone(&self.cookie_key),
178192
config: Arc::clone(&self.config),
193+
cert_dir: self.cert_dir.clone(),
194+
reset_tx: self.reset_tx.clone(),
179195
}
180196
}
181197
}
@@ -272,4 +288,50 @@ impl proxy_server::Proxy for ProxyServer {
272288

273289
Ok(Response::new(UnboundedReceiverStream::new(rx)))
274290
}
291+
292+
#[instrument(skip(self, _request))]
293+
async fn purge(&self, _request: Request<()>) -> Result<Response<()>, Status> {
294+
debug!("Received purge request, removing gRPC certificate files");
295+
let cert_path = self.cert_dir.join(GRPC_CERT_NAME);
296+
let key_path = self.cert_dir.join(GRPC_KEY_NAME);
297+
298+
if let Err(err) = tokio::fs::remove_file(&cert_path).await {
299+
if err.kind() != std::io::ErrorKind::NotFound {
300+
error!(
301+
"Failed to remove gRPC certificate at {:?}: {err}",
302+
cert_path
303+
);
304+
return Err(Status::internal("Failed to remove gRPC certificate"));
305+
}
306+
}
307+
308+
if let Err(err) = tokio::fs::remove_file(&key_path).await {
309+
if err.kind() != std::io::ErrorKind::NotFound {
310+
error!("Failed to remove gRPC key at {:?}: {err}", key_path);
311+
return Err(Status::internal("Failed to remove gRPC key"));
312+
}
313+
}
314+
315+
*self
316+
.config
317+
.lock()
318+
.expect("Failed to lock config mutex during purge") = None;
319+
*self
320+
.core_version
321+
.lock()
322+
.expect("Failed to lock core_version mutex during purge") = None;
323+
*self
324+
.cookie_key
325+
.write()
326+
.expect("Failed to lock cookie key during purge") = None;
327+
self.connected.store(false, Ordering::Relaxed);
328+
329+
if self.reset_tx.send(()).is_err() {
330+
error!("Failed to notify reset handler");
331+
return Err(Status::internal("Failed to restart setup process"));
332+
}
333+
334+
info!("Removed gRPC certificate files; entering setup mode");
335+
Ok(Response::new(()))
336+
}
275337
}

src/http.rs

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -242,32 +242,41 @@ pub async fn run_server(
242242

243243
let mut tasks = JoinSet::new();
244244
let cookie_key = Default::default();
245+
let (reset_tx, mut reset_rx) = tokio::sync::broadcast::channel(1);
245246

246247
// connect to upstream gRPC server
247-
let grpc_server = ProxyServer::new(Arc::clone(&cookie_key));
248+
let grpc_server = ProxyServer::new(
249+
Arc::clone(&cookie_key),
250+
env_config.cert_dir.clone(),
251+
reset_tx,
252+
);
248253

249254
let server_clone = grpc_server.clone();
250255
let env_config_clone = env_config.clone();
251256

252257
// Start gRPC server.
253258
debug!("Spawning gRPC server task");
254259
tasks.spawn(async move {
255-
let proxy_configuration = if let Some(conf) = config {
256-
debug!("Using existing gRPC certificates, skipping setup process");
257-
conf
258-
} else if let Some(logs_rx) = logs_rx {
259-
info!("gRPC certificates not found, running setup process");
260-
let conf = run_setup(&env_config_clone, logs_rx).await?;
261-
info!("Setup process completed successfully");
262-
conf
263-
} else {
264-
anyhow::bail!(
265-
"gRPC certificates not found and logs receiver not available for setup process"
266-
);
267-
};
260+
let logs_rx = logs_rx.ok_or_else(|| {
261+
anyhow::anyhow!(
262+
"gRPC logs receiver not available for setup process; reset cannot be handled"
263+
)
264+
})?;
265+
let mut proxy_configuration = config;
268266

269-
server_clone.configure(proxy_configuration);
270267
loop {
268+
let configuration = if let Some(conf) = proxy_configuration.clone() {
269+
debug!("Using existing gRPC certificates, skipping setup process");
270+
conf
271+
} else {
272+
info!("gRPC certificates not found, running setup process");
273+
let conf = run_setup(&env_config_clone, Arc::clone(&logs_rx)).await?;
274+
info!("Setup process completed successfully");
275+
proxy_configuration = Some(conf.clone());
276+
conf
277+
};
278+
279+
server_clone.configure(configuration);
271280
info!("Starting gRPC server...");
272281
let server_to_run = server_clone.clone();
273282
let addr = SocketAddr::new(
@@ -276,9 +285,32 @@ pub async fn run_server(
276285
.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)),
277286
env_config.grpc_port,
278287
);
279-
280-
if let Err(e) = server_to_run.run(addr).await {
281-
error!("gRPC server error: {e:?}, restarting...");
288+
let mut shutdown_rx = reset_rx.resubscribe();
289+
let mut server_task = tokio::spawn(async move {
290+
server_to_run
291+
.run(addr, async move {
292+
let _ = shutdown_rx.recv().await;
293+
})
294+
.await
295+
});
296+
297+
tokio::select! {
298+
result = &mut server_task => {
299+
match result {
300+
Ok(Ok(())) => {}
301+
Ok(Err(err)) => error!("gRPC server error: {err:?}, restarting..."),
302+
Err(err) => error!("gRPC server task error: {err:?}, restarting..."),
303+
}
304+
}
305+
result = reset_rx.recv() => {
306+
if result.is_ok() {
307+
info!("Reset requested, restarting setup process");
308+
proxy_configuration = None;
309+
} else {
310+
error!("Reset channel closed; gRPC server will keep running");
311+
}
312+
let _ = server_task.await;
313+
}
282314
}
283315
}
284316
});

src/main.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,11 @@ async fn main() -> anyhow::Result<()> {
3333
None
3434
};
3535

36-
let needs_setup = proxy_configuration.is_none();
37-
3836
// TODO: The channel size may need to be adjusted or some other approach should be used
3937
// to avoid dropping log messages.
40-
let (logs_tx, logs_rx) = if needs_setup {
38+
let (logs_tx, logs_rx) = {
4139
let (logs_tx, logs_rx) = mpsc::channel(200);
4240
(Some(logs_tx), Some(logs_rx))
43-
} else {
44-
(None, None)
4541
};
4642

4743
init_tracing(Version::parse(VERSION)?, &env_config.log_level, logs_tx)?;

0 commit comments

Comments
 (0)