Skip to content

Commit 1b78937

Browse files
committed
move acme trigger
1 parent d00690d commit 1b78937

4 files changed

Lines changed: 140 additions & 259 deletions

File tree

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ futures-util = "0.3"
5555
ammonia = "4.1"
5656
chrono = "0.4"
5757
axum-server = { version = "0.7", features = ["tls-rustls"] }
58-
rustls = { version = "0.23", default-features = false, features = ["aws-lc-rs"] }
58+
rustls = { version = "0.23", default-features = false, features = [
59+
"aws-lc-rs",
60+
] }
5961
instant-acme = { version = "0.8", features = ["hyper-rustls", "aws-lc-rs"] }
6062

6163
[build-dependencies]

src/grpc.rs

Lines changed: 120 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ use tower::ServiceBuilder;
2525
use tracing::Instrument;
2626

2727
use crate::{
28-
MIN_CORE_VERSION, VERSION,
29-
acme,
30-
error::ApiError,
31-
http::{GRPC_CERT_NAME, GRPC_KEY_NAME},
32-
proto::{CoreRequest, CoreResponse, DeviceInfo, core_request, core_response, proxy_server},
28+
MIN_CORE_VERSION, VERSION, acme,
29+
acme::Port80Permit,
30+
error::ApiError,
31+
http::{GRPC_CERT_NAME, GRPC_KEY_NAME},
32+
proto::{
33+
AcmeCertificate, AcmeChallenge, AcmeIssueEvent, AcmeProgress, AcmeStep, CoreRequest,
34+
CoreResponse, DeviceInfo, acme_issue_event, core_request, core_response, proxy_server,
35+
},
3336
};
3437

3538
// connected clients
@@ -54,7 +57,7 @@ pub(crate) struct ProxyServer {
5457
https_cert_tx: broadcast::Sender<(String, String)>,
5558
/// `Some` only when the main HTTP server is bound to port 80.
5659
/// Used to hand off port 80 gracefully during ACME HTTP-01 challenges.
57-
port80_pause_tx: Option<mpsc::Sender<(oneshot::Sender<()>, oneshot::Receiver<()>)>>,
60+
port80_pause_tx: Option<mpsc::Sender<(oneshot::Sender<()>, oneshot::Receiver<()>)>>,
5861
}
5962

6063
impl ProxyServer {
@@ -65,7 +68,7 @@ impl ProxyServer {
6568
cert_dir: PathBuf,
6669
reset_tx: broadcast::Sender<()>,
6770
https_cert_tx: broadcast::Sender<(String, String)>,
68-
port80_pause_tx: Option<mpsc::Sender<(oneshot::Sender<()>, oneshot::Receiver<()>)>>,
71+
port80_pause_tx: Option<mpsc::Sender<(oneshot::Sender<()>, oneshot::Receiver<()>)>>,
6972
) -> Self {
7073
Self {
7174
cookie_key,
@@ -209,6 +212,7 @@ impl Clone for ProxyServer {
209212
#[tonic::async_trait]
210213
impl proxy_server::Proxy for ProxyServer {
211214
type BidiStream = UnboundedReceiverStream<Result<CoreRequest, Status>>;
215+
type TriggerAcmeStream = UnboundedReceiverStream<Result<AcmeIssueEvent, Status>>;
212216

213217
/// Handle bidirectional communication with Defguard core.
214218
#[instrument(name = "bidirectional_communication", level = "info", skip(self))]
@@ -247,13 +251,13 @@ impl proxy_server::Proxy for ProxyServer {
247251
.insert(address, tx);
248252
self.connected.store(true, Ordering::Relaxed);
249253

250-
let clients = Arc::clone(&self.clients);
251-
let results = Arc::clone(&self.results);
252-
let connected = Arc::clone(&self.connected);
253-
let cookie_key = Arc::clone(&self.cookie_key);
254-
let https_cert_tx = self.https_cert_tx.clone();
255-
let current_id = Arc::clone(&self.current_id);
256-
let port80_pause_tx = self.port80_pause_tx.clone();
254+
let clients = Arc::clone(&self.clients);
255+
let results = Arc::clone(&self.results);
256+
let connected = Arc::clone(&self.connected);
257+
let cookie_key = Arc::clone(&self.cookie_key);
258+
let https_cert_tx = self.https_cert_tx.clone();
259+
let current_id = Arc::clone(&self.current_id);
260+
let port80_pause_tx = self.port80_pause_tx.clone();
257261
tokio::spawn(
258262
async move {
259263
let mut stream = request.into_inner();
@@ -441,4 +445,106 @@ impl proxy_server::Proxy for ProxyServer {
441445
info!("Removed gRPC certificate files; entering setup mode");
442446
Ok(Response::new(()))
443447
}
448+
449+
#[instrument(skip(self, request))]
450+
async fn trigger_acme(
451+
&self,
452+
request: Request<AcmeChallenge>,
453+
) -> Result<Response<Self::TriggerAcmeStream>, Status> {
454+
let challenge = request.into_inner();
455+
let domain = challenge.domain.clone();
456+
let use_staging = challenge.use_staging;
457+
let existing_credentials = challenge.account_credentials_json.clone();
458+
459+
info!("Starting ACME HTTP-01 for domain: {domain} (staging={use_staging})");
460+
461+
let (tx, rx) = mpsc::unbounded_channel::<Result<AcmeIssueEvent, Status>>();
462+
463+
// Emit the first progress step immediately — we are connected and about to start.
464+
let _ = tx.send(Ok(AcmeIssueEvent {
465+
payload: Some(acme_issue_event::Payload::Progress(AcmeProgress {
466+
step: AcmeStep::Connecting as i32,
467+
})),
468+
}));
469+
470+
let pause_tx = self.port80_pause_tx.clone();
471+
tokio::spawn(async move {
472+
// Request a graceful hand-off of port 80 from the main HTTP server if it is bound
473+
// there, so the ACME challenge listener can bind.
474+
let permit: Option<Port80Permit> = if let Some(ref pause_tx) = pause_tx {
475+
let (ready_tx, ready_rx) = oneshot::channel::<()>();
476+
let (done_tx, done_rx) = oneshot::channel::<()>();
477+
if pause_tx.send((ready_tx, done_rx)).await.is_err() {
478+
error!(
479+
"Failed to request port-80 hand-off for ACME; \
480+
HTTP server may have stopped"
481+
);
482+
let _ = tx.send(Err(Status::internal(
483+
"Failed to request port-80 hand-off for ACME",
484+
)));
485+
return;
486+
}
487+
Some(Port80Permit {
488+
ready: ready_rx,
489+
done_tx,
490+
})
491+
} else {
492+
None
493+
};
494+
495+
// Channel used by run_acme_http01 to emit intermediate progress steps.
496+
let (progress_tx, mut progress_rx) = mpsc::unbounded_channel::<AcmeStep>();
497+
498+
// Forward progress steps from acme.rs onto the gRPC response stream.
499+
let tx_fwd = tx.clone();
500+
tokio::spawn(async move {
501+
while let Some(step) = progress_rx.recv().await {
502+
let event = AcmeIssueEvent {
503+
payload: Some(acme_issue_event::Payload::Progress(AcmeProgress {
504+
step: step as i32,
505+
})),
506+
};
507+
if tx_fwd.send(Ok(event)).is_err() {
508+
// Core disconnected — stop forwarding.
509+
break;
510+
}
511+
}
512+
});
513+
514+
match acme::run_acme_http01(
515+
domain.clone(),
516+
use_staging,
517+
existing_credentials,
518+
permit,
519+
progress_tx,
520+
)
521+
.await
522+
{
523+
Ok(acme_result) => {
524+
let cert_event = AcmeIssueEvent {
525+
payload: Some(acme_issue_event::Payload::Certificate(AcmeCertificate {
526+
cert_pem: acme_result.cert_pem,
527+
key_pem: acme_result.key_pem,
528+
account_credentials_json: acme_result.account_credentials_json,
529+
})),
530+
};
531+
if tx.send(Ok(cert_event)).is_err() {
532+
error!(
533+
"ACME result stream receiver disconnected before cert could be sent"
534+
);
535+
} else {
536+
info!("ACME certificate for domain '{domain}' streamed to Core");
537+
}
538+
}
539+
Err(err) => {
540+
error!("ACME HTTP-01 failed for domain '{domain}': {err}");
541+
let _ = tx.send(Err(Status::internal(format!(
542+
"ACME HTTP-01 certificate issuance failed: {err}"
543+
))));
544+
}
545+
}
546+
});
547+
548+
Ok(Response::new(UnboundedReceiverStream::new(rx)))
549+
}
444550
}

src/http.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,8 @@ async fn powered_by_header<B>(mut response: Response<B>) -> Response<B> {
181181
pub async fn run_setup(
182182
env_config: &EnvConfig,
183183
logs_rx: LogsReceiver,
184-
port80_pause_tx: Option<mpsc::Sender<(oneshot::Sender<()>, oneshot::Receiver<()>)>>,
185184
) -> anyhow::Result<Configuration> {
186-
let setup_server = ProxySetupServer::new(logs_rx, port80_pause_tx);
185+
let setup_server = ProxySetupServer::new(logs_rx);
187186
let cert_dir = Path::new(&env_config.cert_dir);
188187
if !cert_dir.exists() {
189188
tokio::fs::create_dir_all(cert_dir).await.map_err(|err| {
@@ -326,7 +325,6 @@ pub async fn run_server(
326325
} else {
327326
(None, None)
328327
};
329-
let port80_pause_tx_for_setup = port80_pause_tx.clone();
330328

331329
// connect to upstream gRPC server
332330
let grpc_server = ProxyServer::new(
@@ -365,7 +363,6 @@ pub async fn run_server(
365363
let conf = run_setup(
366364
&env_config_clone,
367365
Arc::clone(&logs_rx),
368-
port80_pause_tx_for_setup.clone(),
369366
)
370367
.await?;
371368
info!("Setup process completed successfully");

0 commit comments

Comments
 (0)