Skip to content

Commit 5dd2a85

Browse files
authored
feat(grpc): add exponential backoff for reconnection attempts (#789)
* feat(grpc): add exponential backoff for reconnection attempts Assisted-by: claude-opus-4-6@default <noreply@opencode.ai> * feat(config): make backoff initial/max configurable via YAML, CLI, and env vars Add BackoffConfig to GrpcConfig with YAML (grpc.backoff.initial, grpc.backoff.max), CLI (--backoff-initial, --backoff-max), and env var (FACT_GRPC_BACKOFF_INITIAL, FACT_GRPC_BACKOFF_MAX) support. Extract yaml_to_duration_secs and parse_duration_secs helpers, reusing them for scan_interval parsing as well. Assisted-by: claude-opus-4-6@default <noreply@opencode.ai> * cleanup(grpc): simplify Backoff creation * chore: add changelog line * feat(grpc): add full jitter to backoff reconnection Add configurable jitter to the exponential backoff using rand crate for uniform distribution over [0, delay]. Jitter is enabled by default and can be toggled via YAML (grpc.backoff.jitter), CLI (--backoff-jitter / --no-backoff-jitter), or env var (FACT_GRPC_NO_BACKOFF_JITTER). Assisted-by: claude-opus-4-6@default <noreply@opencode.ai> * feat(config): make backoff multiplier configurable Add multiplier field to BackoffConfig (default 1.5x). Configurable via YAML (grpc.backoff.multiplier), CLI (--backoff-multiplier), and env var (FACT_GRPC_BACKOFF_MULTIPLIER). Must be > 1.0. Drops Eq derive from config types in favor of PartialEq to support storing multiplier as f64 directly. Assisted-by: claude-opus-4-6@default <noreply@opencode.ai> * cleanup: refactor some code for consistency and simplicity * cleanup: add warning when initial backoff is greater than max * cleanup: add _DURATION suffix to max and initial backoff fields * fix: first delay must not be larger than max We also simplify the calculation of the next value.
1 parent 887ffde commit 5dd2a85

7 files changed

Lines changed: 946 additions & 28 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ possible include a PR number for easier tracking.
66

77
## Next
88

9+
* feat(grpc): add exponential backoff for reconnection attempts (#789)
910
* feat: run integration tests on more platforms (#760)
1011
* ROX-34502: reload mTLS certificates on each gRPC connection attempt (#788)
1112
* chore: add formatting and linting to integration test code (#783, #784)

Cargo.lock

Lines changed: 42 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ openssl = "0.10.75"
3030
prometheus-client = { version = "0.25.0", default-features = false }
3131
prost = "0.14.0"
3232
prost-types = "0.14.0"
33+
rand = { version = "0.10.1", default-features = false, features = ["thread_rng"] }
3334
serde = { version = "1.0.219", features = ["derive"] }
3435
serde_json = "1.0.142"
3536
shlex = "2.0.1"

fact/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ tokio-stream = { workspace = true }
2828
prometheus-client = { workspace = true }
2929
prost = { workspace = true }
3030
prost-types = { workspace = true }
31+
rand = { workspace = true }
3132
serde = { workspace = true }
3233
serde_json = { workspace = true }
3334
shlex = { workspace = true }

fact/src/config/mod.rs

Lines changed: 166 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,14 @@ const CONFIG_FILES: [&str; 4] = [
2323
"fact.yaml",
2424
];
2525

26-
#[derive(Debug, Default, PartialEq, Eq, Clone)]
26+
fn yaml_to_duration_secs(v: &Yaml) -> Option<Duration> {
27+
v.as_f64()
28+
.or_else(|| v.as_i64().map(|i| i as f64))
29+
.filter(|s| s.is_finite() && *s >= 0.0)
30+
.map(Duration::from_secs_f64)
31+
}
32+
33+
#[derive(Debug, Default, PartialEq, Clone)]
2734
pub struct FactConfig {
2835
paths: Option<Vec<PathBuf>>,
2936
pub grpc: GrpcConfig,
@@ -218,20 +225,11 @@ impl TryFrom<Vec<Yaml>> for FactConfig {
218225
config.hotreload = Some(hotreload);
219226
}
220227
"scan_interval" => {
221-
// scan_internal == 0 disables the scanner
222-
if let Some(scan_interval) = v.as_f64() {
223-
if scan_interval < 0.0 {
224-
bail!("invalid scan_interval: {scan_interval}");
225-
}
226-
config.scan_interval = Some(Duration::from_secs_f64(scan_interval));
227-
} else if let Some(scan_interval) = v.as_i64() {
228-
if scan_interval < 0 {
229-
bail!("invalid scan_interval: {scan_interval}");
230-
}
231-
config.scan_interval = Some(Duration::from_secs(scan_interval as u64))
232-
} else {
233-
bail!("scan_interval field has incorrect type: {v:?}");
234-
}
228+
// scan_interval == 0 disables the scanner
229+
let Some(scan_interval) = yaml_to_duration_secs(v) else {
230+
bail!("invalid scan_interval: {v:?}");
231+
};
232+
config.scan_interval = Some(scan_interval);
235233
}
236234
"rate_limit" => {
237235
// rate_limit == 0 means unlimited (no throttling)
@@ -328,10 +326,97 @@ impl TryFrom<&yaml::Hash> for EndpointConfig {
328326
}
329327
}
330328

331-
#[derive(Debug, Default, PartialEq, Eq, Clone)]
329+
#[derive(Debug, Default, PartialEq, Clone)]
330+
pub struct BackoffConfig {
331+
initial: Option<Duration>,
332+
max: Option<Duration>,
333+
jitter: Option<bool>,
334+
multiplier: Option<f64>,
335+
}
336+
337+
impl BackoffConfig {
338+
fn update(&mut self, from: &BackoffConfig) {
339+
if let Some(initial) = from.initial {
340+
self.initial = Some(initial);
341+
}
342+
if let Some(max) = from.max {
343+
self.max = Some(max);
344+
}
345+
if let Some(jitter) = from.jitter {
346+
self.jitter = Some(jitter);
347+
}
348+
if let Some(multiplier) = from.multiplier {
349+
self.multiplier = Some(multiplier);
350+
}
351+
}
352+
353+
pub fn initial(&self) -> Duration {
354+
self.initial.unwrap_or(Duration::from_secs(1))
355+
}
356+
357+
pub fn max(&self) -> Duration {
358+
self.max.unwrap_or(Duration::from_secs(60))
359+
}
360+
361+
pub fn jitter(&self) -> bool {
362+
self.jitter.unwrap_or(true)
363+
}
364+
365+
pub fn multiplier(&self) -> f64 {
366+
self.multiplier.unwrap_or(1.5)
367+
}
368+
}
369+
370+
impl TryFrom<&yaml::Hash> for BackoffConfig {
371+
type Error = anyhow::Error;
372+
373+
fn try_from(value: &yaml::Hash) -> Result<Self, Self::Error> {
374+
let mut backoff = BackoffConfig::default();
375+
for (k, v) in value.iter() {
376+
let Some(k) = k.as_str() else {
377+
bail!("key is not string: {k:?}");
378+
};
379+
match k {
380+
"initial" => {
381+
let Some(initial) = yaml_to_duration_secs(v).filter(|d| !d.is_zero()) else {
382+
bail!("invalid grpc.backoff.initial: {v:?}");
383+
};
384+
backoff.initial = Some(initial);
385+
}
386+
"max" => {
387+
let Some(max) = yaml_to_duration_secs(v).filter(|d| !d.is_zero()) else {
388+
bail!("invalid grpc.backoff.max: {v:?}");
389+
};
390+
backoff.max = Some(max);
391+
}
392+
"jitter" => {
393+
let Some(jitter) = v.as_bool() else {
394+
bail!("grpc.backoff.jitter field has incorrect type: {v:?}");
395+
};
396+
backoff.jitter = Some(jitter);
397+
}
398+
"multiplier" => {
399+
let Some(multiplier) = v
400+
.as_f64()
401+
.or_else(|| v.as_i64().map(|v| v as f64))
402+
.filter(|mult| mult.is_finite() && *mult > 1.0)
403+
else {
404+
bail!("invalid grpc.backoff.multiplier: {v:?}");
405+
};
406+
backoff.multiplier = Some(multiplier);
407+
}
408+
name => bail!("Invalid field 'grpc.backoff.{name}' with value: {v:?}"),
409+
}
410+
}
411+
Ok(backoff)
412+
}
413+
}
414+
415+
#[derive(Debug, Default, PartialEq, Clone)]
332416
pub struct GrpcConfig {
333417
url: Option<String>,
334418
certs: Option<PathBuf>,
419+
pub backoff: BackoffConfig,
335420
}
336421

337422
impl GrpcConfig {
@@ -343,6 +428,8 @@ impl GrpcConfig {
343428
if let Some(certs) = from.certs.as_deref() {
344429
self.certs = Some(certs.to_owned());
345430
}
431+
432+
self.backoff.update(&from.backoff);
346433
}
347434

348435
pub fn url(&self) -> Option<&str> {
@@ -377,6 +464,12 @@ impl TryFrom<&yaml::Hash> for GrpcConfig {
377464
};
378465
grpc.certs = Some(PathBuf::from(certs));
379466
}
467+
"backoff" => {
468+
let Some(backoff) = v.as_hash() else {
469+
bail!("grpc.backoff section has incorrect type: {v:?}");
470+
};
471+
grpc.backoff = BackoffConfig::try_from(backoff)?;
472+
}
380473
name => bail!("Invalid field 'grpc.{name}' with value: {v:?}"),
381474
}
382475
}
@@ -450,6 +543,30 @@ impl TryFrom<&yaml::Hash> for BpfConfig {
450543
}
451544
}
452545

546+
fn parse_duration_secs(s: &str) -> anyhow::Result<Duration> {
547+
let f = s.parse::<f64>()?;
548+
if !f.is_finite() || f < 0.0 {
549+
bail!("value must be a non-negative finite number, got {f}");
550+
}
551+
Ok(Duration::from_secs_f64(f))
552+
}
553+
554+
fn parse_positive_duration_secs(s: &str) -> anyhow::Result<Duration> {
555+
let d = parse_duration_secs(s)?;
556+
if d.is_zero() {
557+
bail!("value must be greater than zero");
558+
}
559+
Ok(d)
560+
}
561+
562+
fn parse_multiplier(s: &str) -> anyhow::Result<f64> {
563+
let mult = s.parse::<f64>()?;
564+
if !mult.is_finite() || mult <= 1.0 {
565+
bail!("multiplier must be > 1.0, got {mult}");
566+
}
567+
Ok(mult)
568+
}
569+
453570
#[derive(Debug, Parser)]
454571
#[clap(version = crate::version::FACT_VERSION, about)]
455572
pub struct FactCli {
@@ -465,6 +582,30 @@ pub struct FactCli {
465582
#[arg(short, long, env = "FACT_CERTS")]
466583
certs: Option<PathBuf>,
467584

585+
/// Initial backoff delay in seconds for gRPC reconnection
586+
///
587+
/// Default value is 1 second
588+
#[arg(long, env = "FACT_GRPC_BACKOFF_INITIAL_DURATION", value_parser = parse_positive_duration_secs)]
589+
backoff_initial: Option<Duration>,
590+
591+
/// Maximum backoff delay in seconds for gRPC reconnection
592+
///
593+
/// Default value is 60 seconds
594+
#[arg(long, env = "FACT_GRPC_BACKOFF_MAX_DURATION", value_parser = parse_positive_duration_secs)]
595+
backoff_max: Option<Duration>,
596+
597+
/// Backoff multiplier for gRPC reconnection
598+
///
599+
/// Must be > 1.0. Default value is 1.5
600+
#[arg(long, env = "FACT_GRPC_BACKOFF_MULTIPLIER", value_parser = parse_multiplier)]
601+
backoff_multiplier: Option<f64>,
602+
603+
/// Enable jitter for gRPC reconnection backoff
604+
///
605+
/// Default value is true
606+
#[arg(long, env = "FACT_GRPC_BACKOFF_JITTER")]
607+
backoff_jitter: Option<bool>,
608+
468609
/// The port to bind for all exposed endpoints
469610
#[arg(long, short, env = "FACT_ENDPOINT_ADDRESS")]
470611
address: Option<SocketAddr>,
@@ -535,8 +676,8 @@ pub struct FactCli {
535676
/// The seconds can use a decimal point for fractions of seconds.
536677
///
537678
/// Default value is 30 seconds
538-
#[arg(long, short, env = "FACT_SCAN_INTERVAL")]
539-
scan_interval: Option<f64>,
679+
#[arg(long, short, env = "FACT_SCAN_INTERVAL", value_parser = parse_duration_secs)]
680+
scan_interval: Option<Duration>,
540681

541682
/// Maximum number of file events to allow per second
542683
///
@@ -555,6 +696,12 @@ impl FactCli {
555696
grpc: GrpcConfig {
556697
url: self.url.clone(),
557698
certs: self.certs.clone(),
699+
backoff: BackoffConfig {
700+
initial: self.backoff_initial,
701+
max: self.backoff_max,
702+
jitter: self.backoff_jitter,
703+
multiplier: self.backoff_multiplier,
704+
},
558705
},
559706
endpoint: EndpointConfig {
560707
address: self.address,
@@ -568,7 +715,7 @@ impl FactCli {
568715
skip_pre_flight: resolve_bool_arg(self.skip_pre_flight, self.no_skip_pre_flight),
569716
json: resolve_bool_arg(self.json, self.no_json),
570717
hotreload: resolve_bool_arg(self.hotreload, self.no_hotreload),
571-
scan_interval: self.scan_interval.map(Duration::from_secs_f64),
718+
scan_interval: self.scan_interval,
572719
rate_limit: self.rate_limit,
573720
}
574721
}

0 commit comments

Comments
 (0)