Skip to content

Commit 9acb135

Browse files
DeviaVirclaude
andcommitted
daemon: rate-limit failed recycles, add metric + tests
Address review feedback on proactive max-age recycling: - Blocker: a failed recycle attempt kept the existing connection (good) but did not update any timestamp, so is_expired() stayed true and every subsequent request re-attempted the recycle first -- each failed attempt blocking up to DAEMON_CONNECTION_TIMEOUT under the connection mutex. During a sustained "new connections fail" event this turned every fast RPC into a request paying a full connect timeout. Now a failed attempt records last_recycle_attempt and a cooldown (DAEMON_CONN_RECYCLE_COOLDOWN, default 30s) gates retries, so the old socket keeps serving requests at full speed between attempts. - Extract the recycle decision into a pure `recycle_due()` helper and cover it with unit tests (max-age boundary, None, and cooldown). - Add a daemon_rpc_conn_recycled{result="ok|failed"} counter so recycle behavior is observable in prod. - tcp_connect_once no longer warns per-attempt; it returns one descriptive error that callers log, avoiding double log lines on the recycle path. The startup/error loop logs that error + backoff. - Document in --daemon-rpc-conn-max-age help that the reconnect is inline on the request path, so the value should be generous (minutes). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 21a1beb commit 9acb135

2 files changed

Lines changed: 116 additions & 34 deletions

File tree

src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ impl Config {
182182
.arg(
183183
Arg::with_name("daemon_rpc_conn_max_age")
184184
.long("daemon-rpc-conn-max-age")
185-
.help("Max age (in seconds) of a daemon RPC TCP connection before it is proactively recycled. Recycling re-establishes the connection, letting a load balancer (e.g. a Kubernetes ClusterSetIP) re-select a backend after node rotations. 0 = unlimited / never recycle (default)")
185+
.help("Max age (in seconds) of a daemon RPC TCP connection before it is proactively recycled. Recycling re-establishes the connection, letting a load balancer (e.g. a Kubernetes ClusterSetIP) re-select a backend after node rotations. The reconnect happens inline on the next request, so prefer a generous value (minutes, not seconds) to avoid periodic latency spikes. 0 = unlimited / never recycle (default)")
186186
.default_value("0")
187187
.takes_value(true),
188188
)

src/daemon.rs

Lines changed: 115 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use elements::encode::{deserialize, serialize_hex};
2424
use electrs_macros::trace;
2525

2626
use crate::chain::{Block, BlockHash, BlockHeader, Network, Transaction, Txid};
27-
use crate::metrics::{HistogramOpts, HistogramVec, Metrics};
27+
use crate::metrics::{CounterVec, HistogramOpts, HistogramVec, MetricOpts, Metrics};
2828
use crate::signal::Waiter;
2929
use crate::util::{HeaderList, DEFAULT_BLOCKHASH};
3030

@@ -40,6 +40,11 @@ lazy_static! {
4040
static ref DAEMON_WRITE_TIMEOUT: Duration = Duration::from_secs(
4141
env::var("DAEMON_WRITE_TIMEOUT").map_or(10 * 60, |s| s.parse().unwrap())
4242
);
43+
// Minimum delay between *failed* proactive max-age recycle attempts, so that a sustained
44+
// inability to open new connections doesn't make every request pay a connect timeout.
45+
static ref DAEMON_CONN_RECYCLE_COOLDOWN: Duration = Duration::from_secs(
46+
env::var("DAEMON_CONN_RECYCLE_COOLDOWN").map_or(30, |s| s.parse().unwrap())
47+
);
4348
}
4449

4550
const MAX_ATTEMPTS: u32 = 5;
@@ -196,6 +201,9 @@ struct Connection {
196201
established: Instant,
197202
// Maximum age of a connection before it is proactively recycled, or None for unlimited.
198203
max_age: Option<Duration>,
204+
// When the last *failed* proactive recycle attempt happened, used to rate-limit retries
205+
// (see `DAEMON_CONN_RECYCLE_COOLDOWN`). None until a recycle attempt fails.
206+
last_recycle_attempt: Option<Instant>,
199207
}
200208

201209
fn configure_stream(conn: &TcpStream) {
@@ -220,33 +228,26 @@ fn tcp_connect_once(
220228
}
221229
Err(err) => err,
222230
};
223-
let suffix = if fallback.is_some() {
224-
" (trying fallback...)"
225-
} else {
226-
""
227-
};
228-
warn!(
229-
"failed to connect to primary daemon at {}: {}{}",
230-
primary, primary_err, suffix
231-
);
231+
// Return a single descriptive error and let the caller decide how to log it, rather than
232+
// warning per-attempt here (which would double-log on the best-effort recycle path).
232233
match fallback {
233-
Some(fallback_addr) => match TcpStream::connect_timeout(&fallback_addr, *DAEMON_CONNECTION_TIMEOUT) {
234-
Ok(conn) => {
235-
info!("connected to fallback daemon at {}", fallback_addr);
236-
configure_stream(&conn);
237-
Ok((conn, fallback_addr))
238-
}
239-
Err(fallback_err) => {
240-
warn!(
241-
"failed to connect to fallback daemon at {}: {}",
242-
fallback_addr, fallback_err
243-
);
244-
bail!(ErrorKind::Connection(format!(
234+
Some(fallback_addr) => {
235+
debug!(
236+
"primary daemon at {} unreachable ({}), trying fallback {}",
237+
primary, primary_err, fallback_addr
238+
);
239+
match TcpStream::connect_timeout(&fallback_addr, *DAEMON_CONNECTION_TIMEOUT) {
240+
Ok(conn) => {
241+
info!("connected to fallback daemon at {}", fallback_addr);
242+
configure_stream(&conn);
243+
Ok((conn, fallback_addr))
244+
}
245+
Err(fallback_err) => bail!(ErrorKind::Connection(format!(
245246
"failed to connect to primary daemon at {} ({}) and fallback at {} ({})",
246247
primary, primary_err, fallback_addr, fallback_err
247-
)))
248+
))),
248249
}
249-
},
250+
}
250251
None => bail!(ErrorKind::Connection(format!(
251252
"failed to connect to daemon at {}: {}",
252253
primary, primary_err
@@ -266,15 +267,36 @@ fn tcp_connect(
266267
loop {
267268
match tcp_connect_once(primary, fallback) {
268269
Ok(res) => return Ok(res),
269-
Err(_) => {
270-
warn!("backoff 3 seconds before next attempt");
270+
Err(err) => {
271+
warn!(
272+
"{}; backoff 3 seconds before next attempt",
273+
err.display_chain()
274+
);
271275
signal.wait(Duration::from_secs(3), false)?;
272276
continue;
273277
}
274278
}
275279
}
276280
}
277281

282+
/// Decide whether an expired connection is due for a (re)attempt at proactive recycling.
283+
/// Returns false when no max age is configured, when the connection is younger than the max
284+
/// age, or when a previous recycle attempt failed less than `cooldown` ago (to avoid paying a
285+
/// connect timeout on every request during a sustained connect failure). Pure for testability.
286+
fn recycle_due(
287+
age: Duration,
288+
max_age: Option<Duration>,
289+
since_last_attempt: Option<Duration>,
290+
cooldown: Duration,
291+
) -> bool {
292+
match max_age {
293+
None => false,
294+
Some(max_age) => {
295+
age >= max_age && since_last_attempt.map_or(true, |since| since >= cooldown)
296+
}
297+
}
298+
}
299+
278300
impl Connection {
279301
#[trace]
280302
fn new(
@@ -313,6 +335,7 @@ impl Connection {
313335
signal,
314336
established: Instant::now(),
315337
max_age,
338+
last_recycle_attempt: None,
316339
})
317340
}
318341

@@ -345,11 +368,16 @@ impl Connection {
345368
)
346369
}
347370

348-
/// Whether this connection has exceeded its configured `max_age` and should be recycled.
371+
/// Whether this connection is due to be proactively recycled now: it has exceeded its
372+
/// configured `max_age` and no recent recycle attempt has failed within the cooldown.
349373
/// Always false when no max age is configured (unlimited).
350-
fn is_expired(&self) -> bool {
351-
self.max_age
352-
.map_or(false, |max_age| self.established.elapsed() >= max_age)
374+
fn should_recycle(&self) -> bool {
375+
recycle_due(
376+
self.established.elapsed(),
377+
self.max_age,
378+
self.last_recycle_attempt.map(|at| at.elapsed()),
379+
*DAEMON_CONN_RECYCLE_COOLDOWN,
380+
)
353381
}
354382

355383
#[trace]
@@ -462,6 +490,7 @@ pub struct Daemon {
462490
// monitoring
463491
latency: HistogramVec,
464492
size: HistogramVec,
493+
conn_recycle: CounterVec,
465494
}
466495

467496
impl Daemon {
@@ -506,6 +535,13 @@ impl Daemon {
506535
HistogramOpts::new("daemon_bytes", "Bitcoind RPC size (in bytes)"),
507536
&["method", "dir"],
508537
),
538+
conn_recycle: metrics.counter_vec(
539+
MetricOpts::new(
540+
"daemon_rpc_conn_recycled",
541+
"Proactive daemon RPC connection recycle attempts (by result)",
542+
),
543+
&["result"],
544+
),
509545
};
510546
let network_info = daemon.getnetworkinfo()?;
511547
info!("{:?}", network_info);
@@ -551,6 +587,7 @@ impl Daemon {
551587
rpc_threads: self.rpc_threads.clone(),
552588
latency: self.latency.clone(),
553589
size: self.size.clone(),
590+
conn_recycle: self.conn_recycle.clone(),
554591
})
555592
}
556593

@@ -597,7 +634,7 @@ impl Daemon {
597634
// the TCP connection lets a fronting load balancer (e.g. a Kubernetes ClusterSetIP)
598635
// re-select a backend, so a long-lived connection does not stay pinned to a stale
599636
// endpoint after node rotations. No-op when no max age is configured (the default).
600-
if conn.is_expired() {
637+
if conn.should_recycle() {
601638
match conn.try_reconnect_once() {
602639
Ok(new_conn) => {
603640
debug!(
@@ -606,12 +643,16 @@ impl Daemon {
606643
conn.established.elapsed()
607644
);
608645
*conn = new_conn;
646+
self.conn_recycle.with_label_values(&["ok"]).inc();
609647
}
610648
Err(err) => {
611649
// Recycling is best-effort: if no fresh socket is available (e.g. a
612650
// transient load-balancer hiccup), keep using the existing healthy
613-
// connection and retry recycling on a later request, rather than
614-
// blocking requests while the connection is still usable.
651+
// connection rather than blocking requests while it is still usable.
652+
// Record the failed attempt so we don't retry (and pay a connect timeout)
653+
// on every subsequent request; the next attempt waits out the cooldown.
654+
conn.last_recycle_attempt = Some(Instant::now());
655+
self.conn_recycle.with_label_values(&["failed"]).inc();
615656
warn!(
616657
"failed recycling expired daemon RPC connection, keeping existing connection: {}",
617658
err.display_chain()
@@ -996,3 +1037,44 @@ impl Daemon {
9961037
Ok(relayfee * 100_000f64)
9971038
}
9981039
}
1040+
1041+
#[cfg(test)]
1042+
mod tests {
1043+
use super::recycle_due;
1044+
use std::time::Duration;
1045+
1046+
const COOLDOWN: Duration = Duration::from_secs(30);
1047+
const MAX_AGE: Option<Duration> = Some(Duration::from_secs(60));
1048+
1049+
fn secs(n: u64) -> Duration {
1050+
Duration::from_secs(n)
1051+
}
1052+
1053+
#[test]
1054+
fn no_max_age_never_recycles() {
1055+
// Unlimited (the default): never recycle, regardless of age.
1056+
assert!(!recycle_due(secs(10_000), None, None, COOLDOWN));
1057+
}
1058+
1059+
#[test]
1060+
fn younger_than_max_age_does_not_recycle() {
1061+
assert!(!recycle_due(secs(5), MAX_AGE, None, COOLDOWN));
1062+
}
1063+
1064+
#[test]
1065+
fn expired_with_no_prior_attempt_recycles() {
1066+
assert!(recycle_due(secs(61), MAX_AGE, None, COOLDOWN));
1067+
}
1068+
1069+
#[test]
1070+
fn expired_within_cooldown_waits() {
1071+
// A recent failed attempt should suppress retries until the cooldown elapses,
1072+
// even though the connection is well past its max age.
1073+
assert!(!recycle_due(secs(600), MAX_AGE, Some(secs(5)), COOLDOWN));
1074+
}
1075+
1076+
#[test]
1077+
fn expired_after_cooldown_retries() {
1078+
assert!(recycle_due(secs(600), MAX_AGE, Some(secs(31)), COOLDOWN));
1079+
}
1080+
}

0 commit comments

Comments
 (0)