diff --git a/fact/src/config/mod.rs b/fact/src/config/mod.rs index 1ac7ba0e..fbae565a 100644 --- a/fact/src/config/mod.rs +++ b/fact/src/config/mod.rs @@ -332,6 +332,7 @@ pub struct BackoffConfig { max: Option, jitter: Option, multiplier: Option, + retries_max: Option, } impl BackoffConfig { @@ -348,6 +349,9 @@ impl BackoffConfig { if let Some(multiplier) = from.multiplier { self.multiplier = Some(multiplier); } + if let Some(retries) = from.retries_max { + self.retries_max = Some(retries); + } } pub fn initial(&self) -> Duration { @@ -365,6 +369,10 @@ impl BackoffConfig { pub fn multiplier(&self) -> f64 { self.multiplier.unwrap_or(1.5) } + + pub fn retries(&self) -> u64 { + self.retries_max.unwrap_or(10) + } } impl TryFrom<&yaml::Hash> for BackoffConfig { @@ -405,6 +413,12 @@ impl TryFrom<&yaml::Hash> for BackoffConfig { }; backoff.multiplier = Some(multiplier); } + "retries" => { + let Some(retries) = v.as_i64() else { + bail!("invalid grpc.backoff.retries: {v:?}"); + }; + backoff.retries_max = Some(retries as u64); + } name => bail!("Invalid field 'grpc.backoff.{name}' with value: {v:?}"), } } @@ -606,6 +620,13 @@ pub struct FactCli { #[arg(long, env = "FACT_GRPC_BACKOFF_JITTER")] backoff_jitter: Option, + /// Maximum number of times a gRPC connection will be attempted + /// before giving up + /// + /// 0 means infinite retries + #[arg(long, env = "FACT_GRPC_BACKOFF_RETRIES_MAX")] + backoff_retries_max: Option, + /// The port to bind for all exposed endpoints #[arg(long, short, env = "FACT_ENDPOINT_ADDRESS")] address: Option, @@ -701,6 +722,7 @@ impl FactCli { max: self.backoff_max, jitter: self.backoff_jitter, multiplier: self.backoff_multiplier, + retries_max: self.backoff_retries_max, }, }, endpoint: EndpointConfig { diff --git a/fact/src/config/tests.rs b/fact/src/config/tests.rs index e463f296..291b69ac 100644 --- a/fact/src/config/tests.rs +++ b/fact/src/config/tests.rs @@ -340,6 +340,23 @@ fn parsing() { ..Default::default() }, ), + ( + r#" + grpc: + backoff: + retries: 5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + retries_max: Some(5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( r#" grpc: @@ -348,6 +365,7 @@ fn parsing() { max: 120 jitter: false multiplier: 2 + retries: 5 "#, FactConfig { grpc: GrpcConfig { @@ -356,6 +374,7 @@ fn parsing() { max: Some(Duration::from_secs(120)), jitter: Some(false), multiplier: Some(2.0), + retries_max: Some(5), }, ..Default::default() }, @@ -374,6 +393,7 @@ fn parsing() { max: 120 jitter: false multiplier: 2 + retries: 5 endpoint: address: 0.0.0.0:8080 expose_metrics: true @@ -396,6 +416,7 @@ fn parsing() { max: Some(Duration::from_secs(120)), jitter: Some(false), multiplier: Some(2.0), + retries_max: Some(5), }, }, endpoint: EndpointConfig { @@ -543,6 +564,22 @@ paths: "#, "invalid grpc.backoff.multiplier: Real(\"0.5\")", ), + ( + r#" + grpc: + backoff: + retries: 0.5 + "#, + "invalid grpc.backoff.retries: Real(\"0.5\")", + ), + ( + r#" + grpc: + backoff: + retries: true + "#, + "invalid grpc.backoff.retries: Boolean(true)", + ), ( r#" grpc: @@ -1058,6 +1095,51 @@ fn update() { ..Default::default() }, ), + ( + r#" + grpc: + backoff: + retries: 5 + "#, + FactConfig::default(), + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + retries_max: Some(5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + retries: 5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + retries_max: Some(10), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + retries_max: Some(5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( r#" endpoint: @@ -1425,6 +1507,7 @@ fn update() { max: 120 jitter: false multiplier: 3.0 + retries: 5 endpoint: address: 127.0.0.1:8080 expose_metrics: true @@ -1447,6 +1530,7 @@ fn update() { max: Some(Duration::from_secs(30)), jitter: Some(true), multiplier: Some(2.0), + retries_max: Some(20), }, }, endpoint: EndpointConfig { @@ -1474,6 +1558,7 @@ fn update() { max: Some(Duration::from_secs(120)), jitter: Some(false), multiplier: Some(3.0), + retries_max: Some(5), }, }, endpoint: EndpointConfig { @@ -1525,6 +1610,7 @@ fn defaults() { assert_eq!(config.grpc.backoff.max(), Duration::from_secs(60)); assert!(config.grpc.backoff.jitter()); assert_eq!(config.grpc.backoff.multiplier(), 1.5); + assert_eq!(config.grpc.backoff.retries(), 10); } static ENV_MUTEX: Mutex<()> = Mutex::new(()); @@ -1756,6 +1842,22 @@ fn env_vars() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_RETRIES_MAX", + value: "5", + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + retries_max: Some(5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( EnvVar { name: "FACT_ENDPOINT_ADDRESS", diff --git a/fact/src/lib.rs b/fact/src/lib.rs index bd9d708b..8ec187f6 100644 --- a/fact/src/lib.rs +++ b/fact/src/lib.rs @@ -1,6 +1,6 @@ use std::{borrow::BorrowMut, io::Write, str::FromStr}; -use anyhow::Context; +use anyhow::{Context, bail}; use bpf::Bpf; use host_info::{SystemInfo, get_distro, get_hostname}; use host_scanner::HostScanner; @@ -10,6 +10,7 @@ use rate_limiter::RateLimiter; use tokio::{ signal::unix::{SignalKind, signal}, sync::watch, + task::JoinError, }; mod bpf; @@ -64,6 +65,19 @@ pub fn log_system_information() { info!("Hostname: {}", get_hostname()); } +fn flatten_task_result( + component: &str, + res: Result, JoinError>, +) -> anyhow::Result<()> { + match res { + Ok(Ok(_)) => Ok(()), + Ok(Err(e)) => { + bail!("{component} worker errored out: {e:?}"); + } + Err(e) => bail!("{component} task errored out: {e:?}"), + } +} + pub async fn run(config: FactConfig) -> anyhow::Result<()> { // Log system information as early as possible so we have it // available in case of a crash @@ -99,13 +113,13 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { exporter.metrics.rate_limiter.clone(), )?; - output::start( + let mut output_handle = output::start( rx, running.subscribe(), exporter.metrics.output.clone(), reloader.grpc(), reloader.config().json(), - )?; + ); let mut host_scanner_handle = host_scanner.start(); let mut rate_limiter_handle = rate_limiter.start(); endpoints::Server::new(exporter.clone(), reloader.endpoint(), running.subscribe()).start(); @@ -114,43 +128,28 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { let mut sigterm = signal(SignalKind::terminate())?; let mut sighup = signal(SignalKind::hangup())?; - loop { + let res = loop { tokio::select! { - _ = tokio::signal::ctrl_c() => break, - _ = sigterm.recv() => break, + _ = tokio::signal::ctrl_c() => break Ok(()), + _ = sigterm.recv() => break Ok(()), _ = sighup.recv() => config_trigger.notify_one(), - res = bpf_handle.borrow_mut() => { - match res { - Ok(res) => if let Err(e) = res { - warn!("BPF worker errored out: {e:?}"); - } - Err(e) => warn!("BPF task errored out: {e:?}"), - } - break; + task_res = bpf_handle.borrow_mut() => { + break flatten_task_result("BPF", task_res); + } + task_res = host_scanner_handle.borrow_mut() => { + break flatten_task_result("HostScanner", task_res); } - res = host_scanner_handle.borrow_mut() => { - match res { - Ok(res) => if let Err(e) = res { - warn!("HostScanner worker errored out: {e:?}"); - } - Err(e) => warn!("HostScanner task errored out: {e:?}"), - } - break; + task_res = rate_limiter_handle.borrow_mut() => { + break flatten_task_result("Rate limiter", task_res); } - res = rate_limiter_handle.borrow_mut() => { - match res { - Ok(res) => if let Err(e) = res { - warn!("Rate limiter worker errored out: {e:?}"); - } - Err(e) => warn!("Rate limiter task errored out: {e:?}"), - } - break; + task_res = output_handle.borrow_mut() => { + break flatten_task_result("Output", task_res); } } - } + }; running.send(false)?; info!("Exiting..."); - Ok(()) + res } diff --git a/fact/src/output/grpc.rs b/fact/src/output/grpc.rs index 10dc0afa..311969b0 100644 --- a/fact/src/output/grpc.rs +++ b/fact/src/output/grpc.rs @@ -4,12 +4,13 @@ use anyhow::{Context, bail}; use fact_api::file_activity_service_client::FileActivityServiceClient; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; -use log::{debug, info, warn}; +use log::{info, warn}; use native_tls::{Certificate, Identity}; use openssl::{ec::EcKey, pkey::PKey}; use tokio::{ fs, sync::{broadcast, watch}, + task::JoinHandle, time::sleep, }; use tokio_stream::{ @@ -30,13 +31,28 @@ struct Backoff { max: Duration, jitter: bool, multiplier: f64, + retries_max: u64, + retries_curr: u64, } impl Backoff { - fn new(initial: Duration, max: Duration, jitter: bool, multiplier: f64) -> Self { - if initial >= max { - warn!("Initial backoff value is equal or greater than max."); - } + fn new( + initial: Duration, + max: Duration, + jitter: bool, + multiplier: f64, + retries_max: u64, + ) -> Self { + let initial = if initial >= max { + warn!( + "Invalid initial value: {} >= {}", + initial.as_secs_f64(), + max.as_secs_f64() + ); + max + } else { + initial + }; Self { initial, @@ -44,22 +60,34 @@ impl Backoff { max, jitter, multiplier, + retries_max, + retries_curr: 0, } } - fn next(&mut self) -> Duration { - let delay = self.current.min(self.max); + fn next(&mut self) -> Option { + if self.retries_max != 0 { + if self.retries_curr >= self.retries_max { + return None; + } + self.retries_curr += 1; + } + + let delay = self.current; self.current = self.current.mul_f64(self.multiplier).min(self.max); - if self.jitter { + let delay = if self.jitter { let nanos = rand::random_range(0..=delay.as_nanos() as u64); Duration::from_nanos(nanos) } else { delay - } + }; + + Some(delay) } fn reset(&mut self) { self.current = self.initial; + self.retries_curr = 0; } } @@ -70,6 +98,7 @@ impl From<&BackoffConfig> for Backoff { value.max(), value.jitter(), value.multiplier(), + value.retries(), ) } } @@ -96,7 +125,7 @@ impl Client { } } - pub fn start(mut self) { + pub fn start(mut self) -> JoinHandle> { tokio::spawn(async move { loop { let res = if self.is_enabled() { @@ -111,10 +140,11 @@ impl Client { info!("Stopping gRPC output..."); break; } - Err(e) => warn!("gRPC error: {e:?}"), + Err(e) => bail!("gRPC error: {e:?}"), } } - }); + Ok(()) + }) } async fn get_connector(&self) -> anyhow::Result>> { @@ -184,8 +214,10 @@ impl Client { let channel = match self.create_channel(connector).await { Ok(channel) => channel, Err(e) => { - let delay = backoff.next(); - debug!("Failed to connect to server: {e:?}, retrying in {delay:?}"); + let Some(delay) = backoff.next() else { + bail!("Failed to connect to server: reconnection attempts exhausted"); + }; + info!("Failed to connect to server: {e:?}, retrying in {delay:?}"); sleep(delay).await; continue; } @@ -241,49 +273,79 @@ mod tests { #[test] fn backoff_exponential_2x() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false, 2.0); - assert_eq!(b.next(), Duration::from_secs(1)); - assert_eq!(b.next(), Duration::from_secs(2)); - assert_eq!(b.next(), Duration::from_secs(4)); - assert_eq!(b.next(), Duration::from_secs(8)); - assert_eq!(b.next(), Duration::from_secs(16)); - assert_eq!(b.next(), Duration::from_secs(32)); + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + false, + 2.0, + 0, + ); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_secs(2))); + assert_eq!(b.next(), Some(Duration::from_secs(4))); + assert_eq!(b.next(), Some(Duration::from_secs(8))); + assert_eq!(b.next(), Some(Duration::from_secs(16))); + assert_eq!(b.next(), Some(Duration::from_secs(32))); } #[test] fn backoff_default_multiplier() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false, 1.5); - assert_eq!(b.next(), Duration::from_secs(1)); - assert_eq!(b.next(), Duration::from_millis(1500)); - assert_eq!(b.next(), Duration::from_millis(2250)); - assert_eq!(b.next(), Duration::from_millis(3375)); + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + false, + 1.5, + 0, + ); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_millis(1500))); + assert_eq!(b.next(), Some(Duration::from_millis(2250))); + assert_eq!(b.next(), Some(Duration::from_millis(3375))); } #[test] fn backoff_caps_at_max() { - let mut b = Backoff::new(Duration::from_secs(32), Duration::from_secs(60), false, 2.0); - assert_eq!(b.next(), Duration::from_secs(32)); - assert_eq!(b.next(), Duration::from_secs(60)); - assert_eq!(b.next(), Duration::from_secs(60)); + let mut b = Backoff::new( + Duration::from_secs(32), + Duration::from_secs(60), + false, + 2.0, + 0, + ); + assert_eq!(b.next(), Some(Duration::from_secs(32))); + assert_eq!(b.next(), Some(Duration::from_secs(60))); + assert_eq!(b.next(), Some(Duration::from_secs(60))); } #[test] fn backoff_reset() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false, 2.0); - assert_eq!(b.next(), Duration::from_secs(1)); - assert_eq!(b.next(), Duration::from_secs(2)); - assert_eq!(b.next(), Duration::from_secs(4)); + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + false, + 2.0, + 0, + ); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_secs(2))); + assert_eq!(b.next(), Some(Duration::from_secs(4))); b.reset(); - assert_eq!(b.next(), Duration::from_secs(1)); - assert_eq!(b.next(), Duration::from_secs(2)); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_secs(2))); } #[test] fn backoff_jitter_within_range() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), true, 1.5); + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + true, + 1.5, + 0, + ); let mut expected_max = Duration::from_secs(1); for _ in 0..100 { - let delay = b.next(); + let delay = b.next().expect("retries exhausted"); assert!( delay <= expected_max, "delay {delay:?} exceeded expected max {expected_max:?}" @@ -295,15 +357,68 @@ mod tests { #[test] fn backoff_jitter_reset() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), true, 1.5); + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + true, + 1.5, + 0, + ); for _ in 0..5 { b.next(); } b.reset(); - let delay = b.next(); + let delay = b.next().expect("retries exhausted"); assert!( delay <= Duration::from_secs(1), "delay {delay:?} exceeded 1s after reset" ); } + + #[test] + fn backoff_reconnection_give_up() { + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + false, + 2.0, + 3, + ); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_secs(2))); + assert_eq!(b.next(), Some(Duration::from_secs(4))); + assert_eq!(b.next(), None); + } + + #[test] + fn backoff_reconnection_reset() { + let mut b = Backoff::new( + Duration::from_secs(1), + Duration::from_secs(60), + false, + 2.0, + 3, + ); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_secs(2))); + b.reset(); + assert_eq!(b.next(), Some(Duration::from_secs(1))); + assert_eq!(b.next(), Some(Duration::from_secs(2))); + assert_eq!(b.next(), Some(Duration::from_secs(4))); + assert_eq!(b.next(), None); + } + + #[test] + fn backoff_initial_greater_than_max() { + let mut b = Backoff::new( + Duration::from_secs(120), + Duration::from_secs(60), + false, + 2.0, + 0, + ); + assert_eq!(b.next(), Some(Duration::from_secs(60))); + assert_eq!(b.next(), Some(Duration::from_secs(60))); + assert_eq!(b.next(), Some(Duration::from_secs(60))); + } } diff --git a/fact/src/output/mod.rs b/fact/src/output/mod.rs index 23cda0b9..09a4eb85 100644 --- a/fact/src/output/mod.rs +++ b/fact/src/output/mod.rs @@ -1,7 +1,11 @@ -use std::sync::Arc; +use std::{borrow::BorrowMut, sync::Arc}; +use anyhow::bail; use log::{debug, warn}; -use tokio::sync::{broadcast, mpsc, watch}; +use tokio::{ + sync::{broadcast, mpsc, watch}, + task::JoinHandle, +}; use crate::{config::GrpcConfig, event::Event, metrics::OutputMetrics}; @@ -18,27 +22,9 @@ pub fn start( metrics: OutputMetrics, config: watch::Receiver, stdout_enabled: bool, -) -> anyhow::Result<()> { +) -> JoinHandle> { let (broad_tx, broad_rx) = broadcast::channel(100); let mut run = running.clone(); - tokio::spawn(async move { - debug!("Starting output component..."); - loop { - tokio::select! { - event = rx.recv() => { - let Some(event) = event else { - break; - }; - - if let Err(e) = broad_tx.send(Arc::new(event)) { - warn!("Failed to forward output event: {e}"); - } - } - _ = run.changed() => if !*run.borrow() { break; } - } - } - debug!("Stopping output component..."); - }); let grpc_client = grpc::Client::new( broad_rx.resubscribe(), @@ -58,7 +44,32 @@ pub fn start( .start(); } - grpc_client.start(); + let mut grpc_handle = grpc_client.start(); + + tokio::spawn(async move { + debug!("Starting output component..."); + loop { + tokio::select! { + event = rx.recv() => { + let Some(event) = event else { + break; + }; - Ok(()) + if let Err(e) = broad_tx.send(Arc::new(event)) { + warn!("Failed to forward output event: {e}"); + } + } + res = grpc_handle.borrow_mut() => { + match res { + Ok(Ok(_)) => break, + Ok(Err(e)) => bail!("grpc worker errored out: {e:?}"), + Err(e) => bail!("grpc task errored out: {e:?}"), + } + } + _ = run.changed() => if !*run.borrow() { break; } + } + } + debug!("Stopping output component..."); + Ok(()) + }) }