Skip to content

Commit 669f5be

Browse files
committed
feat(grpc): enforce a maximum number of reconnection attempts
If fact fails to connect to the gRPC server in the specified number of attemtps it will crash, which can be used as a clear signal in k8s that something is not working as expected. If the number of retries is set to 0, there will be no limit to the amount of times fact attempts to reconnect. In order to allow the reconnection failure to trigger an application wide crash, the main output task monitors the result of the grpc task's handle propagating the error further up. This method should allow for other output components to be added in the future and follow this same pattern without having to change anything outside the output module. The stdout component is not included in this logic because it has no condition that could merit an application wide crash. We also now propagate the error of worker tasks all the way up to the termination of the application.
1 parent 5dd2a85 commit 669f5be

5 files changed

Lines changed: 344 additions & 95 deletions

File tree

fact/src/config/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ pub struct BackoffConfig {
332332
max: Option<Duration>,
333333
jitter: Option<bool>,
334334
multiplier: Option<f64>,
335+
retries_max: Option<u64>,
335336
}
336337

337338
impl BackoffConfig {
@@ -348,6 +349,9 @@ impl BackoffConfig {
348349
if let Some(multiplier) = from.multiplier {
349350
self.multiplier = Some(multiplier);
350351
}
352+
if let Some(retries) = from.retries_max {
353+
self.retries_max = Some(retries);
354+
}
351355
}
352356

353357
pub fn initial(&self) -> Duration {
@@ -365,6 +369,10 @@ impl BackoffConfig {
365369
pub fn multiplier(&self) -> f64 {
366370
self.multiplier.unwrap_or(1.5)
367371
}
372+
373+
pub fn retries(&self) -> u64 {
374+
self.retries_max.unwrap_or(10)
375+
}
368376
}
369377

370378
impl TryFrom<&yaml::Hash> for BackoffConfig {
@@ -405,6 +413,12 @@ impl TryFrom<&yaml::Hash> for BackoffConfig {
405413
};
406414
backoff.multiplier = Some(multiplier);
407415
}
416+
"retries" => {
417+
let Some(retries) = v.as_i64() else {
418+
bail!("invalid grpc.backoff.retries: {v:?}");
419+
};
420+
backoff.retries_max = Some(retries as u64);
421+
}
408422
name => bail!("Invalid field 'grpc.backoff.{name}' with value: {v:?}"),
409423
}
410424
}
@@ -606,6 +620,13 @@ pub struct FactCli {
606620
#[arg(long, env = "FACT_GRPC_BACKOFF_JITTER")]
607621
backoff_jitter: Option<bool>,
608622

623+
/// Maximum number of times a gRPC connection will be attempted
624+
/// before giving up
625+
///
626+
/// 0 means infinite retries
627+
#[arg(long, env = "FACT_GRPC_BACKOFF_RETRIES_MAX")]
628+
backoff_retries_max: Option<u64>,
629+
609630
/// The port to bind for all exposed endpoints
610631
#[arg(long, short, env = "FACT_ENDPOINT_ADDRESS")]
611632
address: Option<SocketAddr>,
@@ -701,6 +722,7 @@ impl FactCli {
701722
max: self.backoff_max,
702723
jitter: self.backoff_jitter,
703724
multiplier: self.backoff_multiplier,
725+
retries_max: self.backoff_retries_max,
704726
},
705727
},
706728
endpoint: EndpointConfig {

fact/src/config/tests.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,23 @@ fn parsing() {
340340
..Default::default()
341341
},
342342
),
343+
(
344+
r#"
345+
grpc:
346+
backoff:
347+
retries: 5
348+
"#,
349+
FactConfig {
350+
grpc: GrpcConfig {
351+
backoff: BackoffConfig {
352+
retries_max: Some(5),
353+
..Default::default()
354+
},
355+
..Default::default()
356+
},
357+
..Default::default()
358+
},
359+
),
343360
(
344361
r#"
345362
grpc:
@@ -348,6 +365,7 @@ fn parsing() {
348365
max: 120
349366
jitter: false
350367
multiplier: 2
368+
retries: 5
351369
"#,
352370
FactConfig {
353371
grpc: GrpcConfig {
@@ -356,6 +374,7 @@ fn parsing() {
356374
max: Some(Duration::from_secs(120)),
357375
jitter: Some(false),
358376
multiplier: Some(2.0),
377+
retries_max: Some(5),
359378
},
360379
..Default::default()
361380
},
@@ -374,6 +393,7 @@ fn parsing() {
374393
max: 120
375394
jitter: false
376395
multiplier: 2
396+
retries: 5
377397
endpoint:
378398
address: 0.0.0.0:8080
379399
expose_metrics: true
@@ -396,6 +416,7 @@ fn parsing() {
396416
max: Some(Duration::from_secs(120)),
397417
jitter: Some(false),
398418
multiplier: Some(2.0),
419+
retries_max: Some(5),
399420
},
400421
},
401422
endpoint: EndpointConfig {
@@ -543,6 +564,22 @@ paths:
543564
"#,
544565
"invalid grpc.backoff.multiplier: Real(\"0.5\")",
545566
),
567+
(
568+
r#"
569+
grpc:
570+
backoff:
571+
retries: 0.5
572+
"#,
573+
"invalid grpc.backoff.retries: Real(\"0.5\")",
574+
),
575+
(
576+
r#"
577+
grpc:
578+
backoff:
579+
retries: true
580+
"#,
581+
"invalid grpc.backoff.retries: Boolean(true)",
582+
),
546583
(
547584
r#"
548585
grpc:
@@ -1058,6 +1095,51 @@ fn update() {
10581095
..Default::default()
10591096
},
10601097
),
1098+
(
1099+
r#"
1100+
grpc:
1101+
backoff:
1102+
retries: 5
1103+
"#,
1104+
FactConfig::default(),
1105+
FactConfig {
1106+
grpc: GrpcConfig {
1107+
backoff: BackoffConfig {
1108+
retries_max: Some(5),
1109+
..Default::default()
1110+
},
1111+
..Default::default()
1112+
},
1113+
..Default::default()
1114+
},
1115+
),
1116+
(
1117+
r#"
1118+
grpc:
1119+
backoff:
1120+
retries: 5
1121+
"#,
1122+
FactConfig {
1123+
grpc: GrpcConfig {
1124+
backoff: BackoffConfig {
1125+
retries_max: Some(10),
1126+
..Default::default()
1127+
},
1128+
..Default::default()
1129+
},
1130+
..Default::default()
1131+
},
1132+
FactConfig {
1133+
grpc: GrpcConfig {
1134+
backoff: BackoffConfig {
1135+
retries_max: Some(5),
1136+
..Default::default()
1137+
},
1138+
..Default::default()
1139+
},
1140+
..Default::default()
1141+
},
1142+
),
10611143
(
10621144
r#"
10631145
endpoint:
@@ -1425,6 +1507,7 @@ fn update() {
14251507
max: 120
14261508
jitter: false
14271509
multiplier: 3.0
1510+
retries: 5
14281511
endpoint:
14291512
address: 127.0.0.1:8080
14301513
expose_metrics: true
@@ -1447,6 +1530,7 @@ fn update() {
14471530
max: Some(Duration::from_secs(30)),
14481531
jitter: Some(true),
14491532
multiplier: Some(2.0),
1533+
retries_max: Some(20),
14501534
},
14511535
},
14521536
endpoint: EndpointConfig {
@@ -1474,6 +1558,7 @@ fn update() {
14741558
max: Some(Duration::from_secs(120)),
14751559
jitter: Some(false),
14761560
multiplier: Some(3.0),
1561+
retries_max: Some(5),
14771562
},
14781563
},
14791564
endpoint: EndpointConfig {
@@ -1525,6 +1610,7 @@ fn defaults() {
15251610
assert_eq!(config.grpc.backoff.max(), Duration::from_secs(60));
15261611
assert!(config.grpc.backoff.jitter());
15271612
assert_eq!(config.grpc.backoff.multiplier(), 1.5);
1613+
assert_eq!(config.grpc.backoff.retries(), 10);
15281614
}
15291615

15301616
static ENV_MUTEX: Mutex<()> = Mutex::new(());
@@ -1756,6 +1842,22 @@ fn env_vars() {
17561842
..Default::default()
17571843
},
17581844
),
1845+
(
1846+
EnvVar {
1847+
name: "FACT_GRPC_BACKOFF_RETRIES_MAX",
1848+
value: "5",
1849+
},
1850+
FactConfig {
1851+
grpc: GrpcConfig {
1852+
backoff: BackoffConfig {
1853+
retries_max: Some(5),
1854+
..Default::default()
1855+
},
1856+
..Default::default()
1857+
},
1858+
..Default::default()
1859+
},
1860+
),
17591861
(
17601862
EnvVar {
17611863
name: "FACT_ENDPOINT_ADDRESS",

fact/src/lib.rs

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{borrow::BorrowMut, io::Write, str::FromStr};
22

3-
use anyhow::Context;
3+
use anyhow::{Context, bail};
44
use bpf::Bpf;
55
use host_info::{SystemInfo, get_distro, get_hostname};
66
use host_scanner::HostScanner;
@@ -10,6 +10,7 @@ use rate_limiter::RateLimiter;
1010
use tokio::{
1111
signal::unix::{SignalKind, signal},
1212
sync::watch,
13+
task::JoinError,
1314
};
1415

1516
mod bpf;
@@ -64,6 +65,19 @@ pub fn log_system_information() {
6465
info!("Hostname: {}", get_hostname());
6566
}
6667

68+
fn flatten_task_result(
69+
component: &str,
70+
res: Result<anyhow::Result<()>, JoinError>,
71+
) -> anyhow::Result<()> {
72+
match res {
73+
Ok(Ok(_)) => Ok(()),
74+
Ok(Err(e)) => {
75+
bail!("{component} worker errored out: {e:?}");
76+
}
77+
Err(e) => bail!("{component} task errored out: {e:?}"),
78+
}
79+
}
80+
6781
pub async fn run(config: FactConfig) -> anyhow::Result<()> {
6882
// Log system information as early as possible so we have it
6983
// available in case of a crash
@@ -99,13 +113,13 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> {
99113
exporter.metrics.rate_limiter.clone(),
100114
)?;
101115

102-
output::start(
116+
let mut output_handle = output::start(
103117
rx,
104118
running.subscribe(),
105119
exporter.metrics.output.clone(),
106120
reloader.grpc(),
107121
reloader.config().json(),
108-
)?;
122+
);
109123
let mut host_scanner_handle = host_scanner.start();
110124
let mut rate_limiter_handle = rate_limiter.start();
111125
endpoints::Server::new(exporter.clone(), reloader.endpoint(), running.subscribe()).start();
@@ -114,43 +128,28 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> {
114128

115129
let mut sigterm = signal(SignalKind::terminate())?;
116130
let mut sighup = signal(SignalKind::hangup())?;
117-
loop {
131+
let res = loop {
118132
tokio::select! {
119-
_ = tokio::signal::ctrl_c() => break,
120-
_ = sigterm.recv() => break,
133+
_ = tokio::signal::ctrl_c() => break Ok(()),
134+
_ = sigterm.recv() => break Ok(()),
121135
_ = sighup.recv() => config_trigger.notify_one(),
122-
res = bpf_handle.borrow_mut() => {
123-
match res {
124-
Ok(res) => if let Err(e) = res {
125-
warn!("BPF worker errored out: {e:?}");
126-
}
127-
Err(e) => warn!("BPF task errored out: {e:?}"),
128-
}
129-
break;
136+
task_res = bpf_handle.borrow_mut() => {
137+
break flatten_task_result("BPF", task_res);
138+
}
139+
task_res = host_scanner_handle.borrow_mut() => {
140+
break flatten_task_result("HostScanner", task_res);
130141
}
131-
res = host_scanner_handle.borrow_mut() => {
132-
match res {
133-
Ok(res) => if let Err(e) = res {
134-
warn!("HostScanner worker errored out: {e:?}");
135-
}
136-
Err(e) => warn!("HostScanner task errored out: {e:?}"),
137-
}
138-
break;
142+
task_res = rate_limiter_handle.borrow_mut() => {
143+
break flatten_task_result("Rate limiter", task_res);
139144
}
140-
res = rate_limiter_handle.borrow_mut() => {
141-
match res {
142-
Ok(res) => if let Err(e) = res {
143-
warn!("Rate limiter worker errored out: {e:?}");
144-
}
145-
Err(e) => warn!("Rate limiter task errored out: {e:?}"),
146-
}
147-
break;
145+
task_res = output_handle.borrow_mut() => {
146+
break flatten_task_result("Output", task_res);
148147
}
149148
}
150-
}
149+
};
151150

152151
running.send(false)?;
153152
info!("Exiting...");
154153

155-
Ok(())
154+
res
156155
}

0 commit comments

Comments
 (0)