From 669f5be00238665b83095afd668dbdb2d196c9d8 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Wed, 24 Jun 2026 17:11:00 +0200 Subject: [PATCH] feat(grpc): enforce a maximum number of reconnection attempts If fact fails to connect to the gRPC server in the specified number of attemtps it will crash, which can be used as a clear signal in k8s that something is not working as expected. If the number of retries is set to 0, there will be no limit to the amount of times fact attempts to reconnect. In order to allow the reconnection failure to trigger an application wide crash, the main output task monitors the result of the grpc task's handle propagating the error further up. This method should allow for other output components to be added in the future and follow this same pattern without having to change anything outside the output module. The stdout component is not included in this logic because it has no condition that could merit an application wide crash. We also now propagate the error of worker tasks all the way up to the termination of the application. --- fact/src/config/mod.rs | 22 +++++ fact/src/config/tests.rs | 102 ++++++++++++++++++++ fact/src/lib.rs | 63 +++++++------ fact/src/output/grpc.rs | 195 +++++++++++++++++++++++++++++++-------- fact/src/output/mod.rs | 57 +++++++----- 5 files changed, 344 insertions(+), 95 deletions(-) diff --git a/fact/src/config/mod.rs b/fact/src/config/mod.rs index 1ac7ba0e..fbae565a 100644 --- a/fact/src/config/mod.rs +++ b/fact/src/config/mod.rs @@ -332,6 +332,7 @@ pub struct BackoffConfig { max: Option, jitter: Option, multiplier: Option, + retries_max: Option, } impl BackoffConfig { @@ -348,6 +349,9 @@ impl BackoffConfig { if let Some(multiplier) = from.multiplier { self.multiplier = Some(multiplier); } + if let Some(retries) = from.retries_max { + self.retries_max = Some(retries); + } } pub fn initial(&self) -> Duration { @@ -365,6 +369,10 @@ impl BackoffConfig { pub fn multiplier(&self) -> f64 { self.multiplier.unwrap_or(1.5) } + + pub fn retries(&self) -> u64 { + self.retries_max.unwrap_or(10) + } } impl TryFrom<&yaml::Hash> for BackoffConfig { @@ -405,6 +413,12 @@ impl TryFrom<&yaml::Hash> for BackoffConfig { }; backoff.multiplier = Some(multiplier); } + "retries" => { + let Some(retries) = v.as_i64() else { + bail!("invalid grpc.backoff.retries: {v:?}"); + }; + backoff.retries_max = Some(retries as u64); + } name => bail!("Invalid field 'grpc.backoff.{name}' with value: {v:?}"), } } @@ -606,6 +620,13 @@ pub struct FactCli { #[arg(long, env = "FACT_GRPC_BACKOFF_JITTER")] backoff_jitter: Option, + /// Maximum number of times a gRPC connection will be attempted + /// before giving up + /// + /// 0 means infinite retries + #[arg(long, env = "FACT_GRPC_BACKOFF_RETRIES_MAX")] + backoff_retries_max: Option, + /// The port to bind for all exposed endpoints #[arg(long, short, env = "FACT_ENDPOINT_ADDRESS")] address: Option, @@ -701,6 +722,7 @@ impl FactCli { max: self.backoff_max, jitter: self.backoff_jitter, multiplier: self.backoff_multiplier, + retries_max: self.backoff_retries_max, }, }, endpoint: EndpointConfig { diff --git a/fact/src/config/tests.rs b/fact/src/config/tests.rs index e463f296..291b69ac 100644 --- a/fact/src/config/tests.rs +++ b/fact/src/config/tests.rs @@ -340,6 +340,23 @@ fn parsing() { ..Default::default() }, ), + ( + r#" + grpc: + backoff: + retries: 5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + retries_max: Some(5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( r#" grpc: @@ -348,6 +365,7 @@ fn parsing() { max: 120 jitter: false multiplier: 2 + retries: 5 "#, FactConfig { grpc: GrpcConfig { @@ -356,6 +374,7 @@ fn parsing() { max: Some(Duration::from_secs(120)), jitter: Some(false), multiplier: Some(2.0), + retries_max: Some(5), }, ..Default::default() }, @@ -374,6 +393,7 @@ fn parsing() { max: 120 jitter: false multiplier: 2 + retries: 5 endpoint: address: 0.0.0.0:8080 expose_metrics: true @@ -396,6 +416,7 @@ fn parsing() { max: Some(Duration::from_secs(120)), jitter: Some(false), multiplier: Some(2.0), + retries_max: Some(5), }, }, endpoint: EndpointConfig { @@ -543,6 +564,22 @@ paths: "#, "invalid grpc.backoff.multiplier: Real(\"0.5\")", ), + ( + r#" + grpc: + backoff: + retries: 0.5 + "#, + "invalid grpc.backoff.retries: Real(\"0.5\")", + ), + ( + r#" + grpc: + backoff: + retries: true + "#, + "invalid grpc.backoff.retries: Boolean(true)", + ), ( r#" grpc: @@ -1058,6 +1095,51 @@ fn update() { ..Default::default() }, ), + ( + r#" + grpc: + backoff: + retries: 5 + "#, + FactConfig::default(), + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + retries_max: Some(5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + retries: 5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + retries_max: Some(10), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + retries_max: Some(5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( r#" endpoint: @@ -1425,6 +1507,7 @@ fn update() { max: 120 jitter: false multiplier: 3.0 + retries: 5 endpoint: address: 127.0.0.1:8080 expose_metrics: true @@ -1447,6 +1530,7 @@ fn update() { max: Some(Duration::from_secs(30)), jitter: Some(true), multiplier: Some(2.0), + retries_max: Some(20), }, }, endpoint: EndpointConfig { @@ -1474,6 +1558,7 @@ fn update() { max: Some(Duration::from_secs(120)), jitter: Some(false), multiplier: Some(3.0), + retries_max: Some(5), }, }, endpoint: EndpointConfig { @@ -1525,6 +1610,7 @@ fn defaults() { assert_eq!(config.grpc.backoff.max(), Duration::from_secs(60)); assert!(config.grpc.backoff.jitter()); assert_eq!(config.grpc.backoff.multiplier(), 1.5); + assert_eq!(config.grpc.backoff.retries(), 10); } static ENV_MUTEX: Mutex<()> = Mutex::new(()); @@ -1756,6 +1842,22 @@ fn env_vars() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_RETRIES_MAX", + value: "5", + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + retries_max: Some(5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( EnvVar { name: "FACT_ENDPOINT_ADDRESS", diff --git a/fact/src/lib.rs b/fact/src/lib.rs index bd9d708b..8ec187f6 100644 --- a/fact/src/lib.rs +++ b/fact/src/lib.rs @@ -1,6 +1,6 @@ use std::{borrow::BorrowMut, io::Write, str::FromStr}; -use anyhow::Context; +use anyhow::{Context, bail}; use bpf::Bpf; use host_info::{SystemInfo, get_distro, get_hostname}; use host_scanner::HostScanner; @@ -10,6 +10,7 @@ use rate_limiter::RateLimiter; use tokio::{ signal::unix::{SignalKind, signal}, sync::watch, + task::JoinError, }; mod bpf; @@ -64,6 +65,19 @@ pub fn log_system_information() { info!("Hostname: {}", get_hostname()); } +fn flatten_task_result( + component: &str, + res: Result, JoinError>, +) -> anyhow::Result<()> { + match res { + Ok(Ok(_)) => Ok(()), + Ok(Err(e)) => { + bail!("{component} worker errored out: {e:?}"); + } + Err(e) => bail!("{component} task errored out: {e:?}"), + } +} + pub async fn run(config: FactConfig) -> anyhow::Result<()> { // Log system information as early as possible so we have it // available in case of a crash @@ -99,13 +113,13 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { exporter.metrics.rate_limiter.clone(), )?; - output::start( + let mut output_handle = output::start( rx, running.subscribe(), exporter.metrics.output.clone(), reloader.grpc(), reloader.config().json(), - )?; + ); let mut host_scanner_handle = host_scanner.start(); let mut rate_limiter_handle = rate_limiter.start(); endpoints::Server::new(exporter.clone(), reloader.endpoint(), running.subscribe()).start(); @@ -114,43 +128,28 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { let mut sigterm = signal(SignalKind::terminate())?; let mut sighup = signal(SignalKind::hangup())?; - loop { + let res = loop { tokio::select! { - _ = tokio::signal::ctrl_c() => break, - _ = sigterm.recv() => break, + _ = tokio::signal::ctrl_c() => break Ok(()), + _ = sigterm.recv() => break Ok(()), _ = sighup.recv() => config_trigger.notify_one(), - res = bpf_handle.borrow_mut() => { - match res { - Ok(res) => if let Err(e) = res { - warn!("BPF worker errored out: {e:?}"); - } - Err(e) => warn!("BPF task errored out: {e:?}"), - } - break; + task_res = bpf_handle.borrow_mut() => { + break flatten_task_result("BPF", task_res); + } + task_res = host_scanner_handle.borrow_mut() => { + break flatten_task_result("HostScanner", task_res); } - res = host_scanner_handle.borrow_mut() => { - match res { - Ok(res) => if let Err(e) = res { - warn!("HostScanner worker errored out: {e:?}"); - } - Err(e) => warn!("HostScanner task errored out: {e:?}"), - } - break; + task_res = rate_limiter_handle.borrow_mut() => { + break flatten_task_result("Rate limiter", task_res); } - res = rate_limiter_handle.borrow_mut() => { - match res { - Ok(res) => if let Err(e) = res { - warn!("Rate limiter worker errored out: {e:?}"); - } - Err(e) => warn!("Rate limiter task errored out: {e:?}"), - } - break; + task_res = output_handle.borrow_mut() => { + break flatten_task_result("Output", task_res); } } - } + }; running.send(false)?; info!("Exiting..."); - Ok(()) + res } diff --git a/fact/src/output/grpc.rs b/fact/src/output/grpc.rs index 10dc0afa..311969b0 100644 --- a/fact/src/output/grpc.rs +++ b/fact/src/output/grpc.rs @@ -4,12 +4,13 @@ use anyhow::{Context, bail}; use fact_api::file_activity_service_client::FileActivityServiceClient; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; -use log::{debug, info, warn}; +use log::{info, warn}; use native_tls::{Certificate, Identity}; use openssl::{ec::EcKey, pkey::PKey}; use tokio::{ fs, sync::{broadcast, watch}, + task::JoinHandle, time::sleep, }; use tokio_stream::{ @@ -30,13 +31,28 @@ struct Backoff { max: Duration, jitter: bool, multiplier: f64, + retries_max: u64, + retries_curr: u64, } impl Backoff { - fn new(initial: Duration, max: Duration, jitter: bool, multiplier: f64) -> Self { - if initial >= max { - warn!("Initial backoff value is equal or greater than max."); - } + fn new( + initial: Duration, + max: Duration, + jitter: bool, + multiplier: f64, + retries_max: u64, + ) -> Self { + let initial = if initial >= max { + warn!( + "Invalid initial value: {} >= {}", + initial.as_secs_f64(), + max.as_secs_f64() + ); + max + } else { + initial + }; Self { initial, @@ -44,22 +60,34 @@ impl Backoff { max, jitter, multiplier, + retries_max, + retries_curr: 0, } } - fn next(&mut self) -> Duration { - let delay = self.current.min(self.max); + fn next(&mut self) -> Option { + if self.retries_max != 0 { + if self.retries_curr >= self.retries_max { + return None; + } + self.retries_curr += 1; + } + + let delay = self.current; self.current = self.current.mul_f64(self.multiplier).min(self.max); - if self.jitter { + let delay = if self.jitter { let nanos = rand::random_range(0..=delay.as_nanos() as u64); Duration::from_nanos(nanos) } else { delay - } + }; + + Some(delay) } fn reset(&mut self) { self.current = self.initial; + self.retries_curr = 0; } } @@ -70,6 +98,7 @@ impl From<&BackoffConfig> for Backoff { value.max(), value.jitter(), value.multiplier(), + value.retries(), ) } } @@ -96,7 +125,7 @@ impl Client { } } - pub fn start(mut self) { + pub fn start(mut self) -> JoinHandle> { tokio::spawn(async move { loop { let res = if self.is_enabled() { @@ -111,10 +140,11 @@ impl Client { info!("Stopping gRPC output..."); break; } - Err(e) => warn!("gRPC error: {e:?}"), + Err(e) => bail!("gRPC error: {e:?}"), } } - }); + Ok(()) + }) } async fn get_connector(&self) -> anyhow::Result>> { @@ -184,8 +214,10 @@ impl Client { let channel = match self.create_channel(connector).await { Ok(channel) => channel, Err(e) => { - let delay = backoff.next(); - debug!("Failed to connect to server: {e:?}, retrying in {delay:?}"); + let Some(delay) = backoff.next() else { + bail!("Failed to connect to server: reconnection attempts exhausted"); + }; + info!("Failed to connect to server: {e:?}, retrying in {delay:?}"); sleep(delay).await; continue; } @@ -241,49 +273,79 @@ mod tests { #[test] fn backoff_exponential_2x() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false, 2.0); - assert_eq!(b.next(), Duration::from_secs(1)); - assert_eq!(b.next(), Duration::from_secs(2)); - assert_eq!(b.next(), Duration::from_secs(4)); - assert_eq!(b.next(), Duration::from_secs(8)); - assert_eq!(b.next(), Duration::from_secs(16)); - assert_eq!(b.next(), Duration::from_secs(32)); + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + false, + 2.0, + 0, + ); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_secs(2))); + assert_eq!(b.next(), Some(Duration::from_secs(4))); + assert_eq!(b.next(), Some(Duration::from_secs(8))); + assert_eq!(b.next(), Some(Duration::from_secs(16))); + assert_eq!(b.next(), Some(Duration::from_secs(32))); } #[test] fn backoff_default_multiplier() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false, 1.5); - assert_eq!(b.next(), Duration::from_secs(1)); - assert_eq!(b.next(), Duration::from_millis(1500)); - assert_eq!(b.next(), Duration::from_millis(2250)); - assert_eq!(b.next(), Duration::from_millis(3375)); + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + false, + 1.5, + 0, + ); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_millis(1500))); + assert_eq!(b.next(), Some(Duration::from_millis(2250))); + assert_eq!(b.next(), Some(Duration::from_millis(3375))); } #[test] fn backoff_caps_at_max() { - let mut b = Backoff::new(Duration::from_secs(32), Duration::from_secs(60), false, 2.0); - assert_eq!(b.next(), Duration::from_secs(32)); - assert_eq!(b.next(), Duration::from_secs(60)); - assert_eq!(b.next(), Duration::from_secs(60)); + let mut b = Backoff::new( + Duration::from_secs(32), + Duration::from_secs(60), + false, + 2.0, + 0, + ); + assert_eq!(b.next(), Some(Duration::from_secs(32))); + assert_eq!(b.next(), Some(Duration::from_secs(60))); + assert_eq!(b.next(), Some(Duration::from_secs(60))); } #[test] fn backoff_reset() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false, 2.0); - assert_eq!(b.next(), Duration::from_secs(1)); - assert_eq!(b.next(), Duration::from_secs(2)); - assert_eq!(b.next(), Duration::from_secs(4)); + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + false, + 2.0, + 0, + ); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_secs(2))); + assert_eq!(b.next(), Some(Duration::from_secs(4))); b.reset(); - assert_eq!(b.next(), Duration::from_secs(1)); - assert_eq!(b.next(), Duration::from_secs(2)); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_secs(2))); } #[test] fn backoff_jitter_within_range() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), true, 1.5); + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + true, + 1.5, + 0, + ); let mut expected_max = Duration::from_secs(1); for _ in 0..100 { - let delay = b.next(); + let delay = b.next().expect("retries exhausted"); assert!( delay <= expected_max, "delay {delay:?} exceeded expected max {expected_max:?}" @@ -295,15 +357,68 @@ mod tests { #[test] fn backoff_jitter_reset() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), true, 1.5); + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + true, + 1.5, + 0, + ); for _ in 0..5 { b.next(); } b.reset(); - let delay = b.next(); + let delay = b.next().expect("retries exhausted"); assert!( delay <= Duration::from_secs(1), "delay {delay:?} exceeded 1s after reset" ); } + + #[test] + fn backoff_reconnection_give_up() { + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + false, + 2.0, + 3, + ); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_secs(2))); + assert_eq!(b.next(), Some(Duration::from_secs(4))); + assert_eq!(b.next(), None); + } + + #[test] + fn backoff_reconnection_reset() { + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + false, + 2.0, + 3, + ); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_secs(2))); + b.reset(); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_secs(2))); + assert_eq!(b.next(), Some(Duration::from_secs(4))); + assert_eq!(b.next(), None); + } + + #[test] + fn backoff_initial_greater_than_max() { + let mut b = Backoff::new( + Duration::from_secs(120), + Duration::from_secs(60), + false, + 2.0, + 0, + ); + assert_eq!(b.next(), Some(Duration::from_secs(60))); + assert_eq!(b.next(), Some(Duration::from_secs(60))); + assert_eq!(b.next(), Some(Duration::from_secs(60))); + } } diff --git a/fact/src/output/mod.rs b/fact/src/output/mod.rs index 23cda0b9..09a4eb85 100644 --- a/fact/src/output/mod.rs +++ b/fact/src/output/mod.rs @@ -1,7 +1,11 @@ -use std::sync::Arc; +use std::{borrow::BorrowMut, sync::Arc}; +use anyhow::bail; use log::{debug, warn}; -use tokio::sync::{broadcast, mpsc, watch}; +use tokio::{ + sync::{broadcast, mpsc, watch}, + task::JoinHandle, +}; use crate::{config::GrpcConfig, event::Event, metrics::OutputMetrics}; @@ -18,27 +22,9 @@ pub fn start( metrics: OutputMetrics, config: watch::Receiver, stdout_enabled: bool, -) -> anyhow::Result<()> { +) -> JoinHandle> { let (broad_tx, broad_rx) = broadcast::channel(100); let mut run = running.clone(); - tokio::spawn(async move { - debug!("Starting output component..."); - loop { - tokio::select! { - event = rx.recv() => { - let Some(event) = event else { - break; - }; - - if let Err(e) = broad_tx.send(Arc::new(event)) { - warn!("Failed to forward output event: {e}"); - } - } - _ = run.changed() => if !*run.borrow() { break; } - } - } - debug!("Stopping output component..."); - }); let grpc_client = grpc::Client::new( broad_rx.resubscribe(), @@ -58,7 +44,32 @@ pub fn start( .start(); } - grpc_client.start(); + let mut grpc_handle = grpc_client.start(); + + tokio::spawn(async move { + debug!("Starting output component..."); + loop { + tokio::select! { + event = rx.recv() => { + let Some(event) = event else { + break; + }; - Ok(()) + if let Err(e) = broad_tx.send(Arc::new(event)) { + warn!("Failed to forward output event: {e}"); + } + } + res = grpc_handle.borrow_mut() => { + match res { + Ok(Ok(_)) => break, + Ok(Err(e)) => bail!("grpc worker errored out: {e:?}"), + Err(e) => bail!("grpc task errored out: {e:?}"), + } + } + _ = run.changed() => if !*run.borrow() { break; } + } + } + debug!("Stopping output component..."); + Ok(()) + }) }