Skip to content

Commit aba9552

Browse files
authored
fix(dgw): retrial logic when binding the listeners (#1525)
The listener binding will be reattempted a set amount of time when a transient error is hit, such as "address already in use". Issue: DGW-310
1 parent 8fbb6ef commit aba9552

1 file changed

Lines changed: 61 additions & 20 deletions

File tree

devolutions-gateway/src/service.rs

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::time::Duration;
44
use anyhow::Context as _;
55
use devolutions_gateway::config::{Conf, ConfHandle};
66
use devolutions_gateway::credential::CredentialStoreHandle;
7-
use devolutions_gateway::listener::GatewayListener;
7+
use devolutions_gateway::listener::{GatewayListener, ListenerUrls};
88
use devolutions_gateway::log::GatewayLog;
99
use devolutions_gateway::recording::recording_message_channel;
1010
use devolutions_gateway::session::session_manager_channel;
@@ -290,25 +290,18 @@ async fn spawn_tasks(conf_handle: ConfHandle) -> anyhow::Result<Tasks> {
290290
traffic_audit_handle: traffic_audit_task.handle(),
291291
};
292292

293-
conf.listeners
294-
.iter()
295-
.map(|listener| {
296-
GatewayListener::init_and_bind(listener, state.clone())
297-
.with_context(|| format!("failed to initialize {}", listener.internal_url))
298-
.inspect(|_| {
299-
let _ = SYSTEM_LOGGER.emit(sysevent_codes::listener_started(
300-
&listener.internal_url,
301-
listener.internal_url.scheme(),
302-
));
303-
})
304-
.inspect_err(|error| {
305-
let _ = SYSTEM_LOGGER.emit(sysevent_codes::listener_bind_failed(&listener.internal_url, error));
306-
})
307-
})
308-
.collect::<anyhow::Result<Vec<GatewayListener>>>()
309-
.context("failed to bind listener")?
310-
.into_iter()
311-
.for_each(|listener| tasks.register(listener));
293+
for listener in &conf.listeners {
294+
let listener = bind_listener_with_retrial(state.clone(), listener)
295+
.await
296+
.with_context(|| {
297+
format!(
298+
"failed to bind listener: {} -> {}",
299+
listener.external_url, listener.internal_url
300+
)
301+
})?;
302+
303+
tasks.register(listener);
304+
}
312305

313306
if let Some(ngrok_conf) = &conf.ngrok {
314307
let session = devolutions_gateway::ngrok::NgrokSession::connect(ngrok_conf)
@@ -381,3 +374,51 @@ fn load_jrl_from_disk(config: &Conf) -> anyhow::Result<Arc<CurrentJrl>> {
381374

382375
Ok(Arc::new(Mutex::new(claims)))
383376
}
377+
378+
async fn bind_listener_with_retrial(state: DgwState, listener_urls: &ListenerUrls) -> anyhow::Result<GatewayListener> {
379+
const MAX_NUM_RETRIES: usize = 10;
380+
const SLEEP_DURATION: Duration = Duration::from_secs(10);
381+
382+
let mut count = 0;
383+
384+
loop {
385+
count += 1;
386+
387+
match GatewayListener::init_and_bind(listener_urls, state.clone()) {
388+
Ok(listener) => {
389+
let _ = SYSTEM_LOGGER.emit(sysevent_codes::listener_started(
390+
&listener_urls.internal_url,
391+
listener_urls.internal_url.scheme(),
392+
));
393+
394+
return Ok(listener);
395+
}
396+
Err(error) => {
397+
let address_already_in_use = error
398+
.source()
399+
.iter()
400+
.flat_map(|source| source.downcast_ref::<std::io::Error>())
401+
.any(|source| source.kind() == std::io::ErrorKind::AddrInUse);
402+
403+
if !address_already_in_use || count > MAX_NUM_RETRIES {
404+
let _ = SYSTEM_LOGGER.emit(sysevent_codes::listener_bind_failed(
405+
&listener_urls.internal_url,
406+
&error,
407+
));
408+
409+
return Err(error);
410+
}
411+
412+
warn!(
413+
error = format!("{error:#}"),
414+
count,
415+
"Failed to bind {}; retrying in {}s",
416+
listener_urls.internal_url,
417+
SLEEP_DURATION.as_secs()
418+
);
419+
420+
tokio::time::sleep(SLEEP_DURATION).await;
421+
}
422+
}
423+
}
424+
}

0 commit comments

Comments
 (0)