Skip to content

Commit bf22a45

Browse files
DeviaVirclaude
andcommitted
daemon: add configurable max-age to recycle RPC connections
Long-lived daemon RPC connections stay pinned to a single backend for their whole lifetime. When electrs connects through a load balancer such as a Kubernetes ClusterSetIP (`*.clusterset.local`), a connection established before a node rotation keeps routing to the original backend via the existing TCP/conntrack flow, even after healthier/closer backends become available. The connection is only re-established on error, so a still-working-but-stale endpoint is never rebalanced. Add a `--daemon-rpc-conn-max-age` option (seconds). When a connection exceeds the configured age it is proactively recycled before the next request, re-establishing the TCP connection so the load balancer can re-select a backend. Defaults to 0 = unlimited (never recycle), so behavior is unchanged unless explicitly enabled. The age check is also applied to the per-thread connections used for parallel RPC requests. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 4607b77 commit bf22a45

4 files changed

Lines changed: 55 additions & 1 deletion

File tree

src/bin/electrs.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ fn run_server(config: Arc<Config>, salt_rwlock: Arc<RwLock<String>>) -> Result<(
7474
config.network_type,
7575
signal.clone(),
7676
&metrics,
77+
config.daemon_conn_max_age,
7778
)?);
7879
info!("opening database at {}", config.db_path.display());
7980
let store = Arc::new(Store::open(&config, &metrics, true));

src/bin/tx-fingerprint-stats.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ fn main() {
4040
config.network_type,
4141
signal,
4242
&metrics,
43+
config.daemon_conn_max_age,
4344
)
4445
.unwrap(),
4546
);

src/config.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::net::SocketAddr;
55
use std::net::ToSocketAddrs;
66
use std::path::{Path, PathBuf};
77
use std::sync::Arc;
8+
use std::time::Duration;
89
use stderrlog;
910

1011
use crate::chain::Network;
@@ -27,6 +28,7 @@ pub struct Config {
2728
pub daemon_rpc_addr: SocketAddr,
2829
pub daemon_rpc_fallback_addr: Option<SocketAddr>,
2930
pub daemon_parallelism: usize,
31+
pub daemon_conn_max_age: Option<Duration>,
3032
pub cookie: Option<String>,
3133
pub electrum_rpc_addr: SocketAddr,
3234
pub http_addr: SocketAddr,
@@ -177,6 +179,13 @@ impl Config {
177179
.help("Number of JSONRPC requests to send in parallel")
178180
.default_value("4")
179181
)
182+
.arg(
183+
Arg::with_name("daemon_rpc_conn_max_age")
184+
.long("daemon-rpc-conn-max-age")
185+
.help("Max age (in seconds) of a daemon RPC TCP connection before it is proactively recycled. Recycling re-establishes the connection, letting a load balancer (e.g. a Kubernetes ClusterSetIP) re-select a backend after node rotations. 0 = unlimited / never recycle (default)")
186+
.default_value("0")
187+
.takes_value(true),
188+
)
180189
.arg(
181190
Arg::with_name("monitoring_addr")
182191
.long("monitoring-addr")
@@ -425,6 +434,15 @@ impl Config {
425434
.value_of("daemon_rpc_fallback_addr")
426435
.map(|e| str_to_socketaddr(e, "Bitcoin Fallback RPC"));
427436

437+
let daemon_conn_max_age: Option<Duration> = m
438+
.value_of("daemon_rpc_conn_max_age")
439+
.map(|s| {
440+
s.parse::<u64>()
441+
.unwrap_or_else(|_| panic!("invalid daemon-rpc-conn-max-age: {}", s))
442+
})
443+
.filter(|&secs| secs > 0)
444+
.map(Duration::from_secs);
445+
428446
let electrum_rpc_addr: SocketAddr = str_to_socketaddr(
429447
m.value_of("electrum_rpc_addr")
430448
.unwrap_or(&format!("127.0.0.1:{}", default_electrum_port)),
@@ -494,6 +512,7 @@ impl Config {
494512
daemon_rpc_addr,
495513
daemon_rpc_fallback_addr,
496514
daemon_parallelism: value_t_or_exit!(m, "daemon_parallelism", usize),
515+
daemon_conn_max_age,
497516
cookie,
498517
utxos_limit: value_t_or_exit!(m, "utxos_limit", usize),
499518
electrum_rpc_addr,

src/daemon.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::net::{SocketAddr, TcpStream};
66
use std::path::PathBuf;
77
use std::str::FromStr;
88
use std::sync::{Arc, Mutex};
9-
use std::time::Duration;
9+
use std::time::{Duration, Instant};
1010
use std::{env, fs, io};
1111

1212
use base64::prelude::{Engine, BASE64_STANDARD};
@@ -188,6 +188,11 @@ struct Connection {
188188
addr: SocketAddr,
189189
fallback: Option<SocketAddr>,
190190
signal: Waiter,
191+
// When the TCP connection was (re)established, used together with `max_age` to
192+
// proactively recycle long-lived connections (see `is_expired`).
193+
established: Instant,
194+
// Maximum age of a connection before it is proactively recycled, or None for unlimited.
195+
max_age: Option<Duration>,
191196
}
192197

193198
#[trace]
@@ -242,6 +247,7 @@ impl Connection {
242247
fallback: Option<SocketAddr>,
243248
cookie_getter: Arc<dyn CookieGetter>,
244249
signal: Waiter,
250+
max_age: Option<Duration>,
245251
) -> Result<Connection> {
246252
let (conn, active_addr) = tcp_connect(addr, fallback, &signal)?;
247253
debug!("connected to bitcoind at {}", active_addr);
@@ -256,6 +262,8 @@ impl Connection {
256262
addr,
257263
fallback,
258264
signal,
265+
established: Instant::now(),
266+
max_age,
259267
})
260268
}
261269

@@ -266,9 +274,17 @@ impl Connection {
266274
self.fallback,
267275
self.cookie_getter.clone(),
268276
self.signal.clone(),
277+
self.max_age,
269278
)
270279
}
271280

281+
/// Whether this connection has exceeded its configured `max_age` and should be recycled.
282+
/// Always false when no max age is configured (unlimited).
283+
fn is_expired(&self) -> bool {
284+
self.max_age
285+
.map_or(false, |max_age| self.established.elapsed() >= max_age)
286+
}
287+
272288
#[trace]
273289
fn send(&mut self, request: &str) -> Result<()> {
274290
let cookie = &self.cookie_getter.get()?;
@@ -372,6 +388,7 @@ pub struct Daemon {
372388
conn: Mutex<Connection>,
373389
message_id: Counter, // for monotonic JSONRPC 'id'
374390
signal: Waiter,
391+
conn_max_age: Option<Duration>,
375392

376393
rpc_threads: Arc<rayon::ThreadPool>,
377394

@@ -391,6 +408,7 @@ impl Daemon {
391408
network: Network,
392409
signal: Waiter,
393410
metrics: &Metrics,
411+
conn_max_age: Option<Duration>,
394412
) -> Result<Daemon> {
395413
let daemon = Daemon {
396414
daemon_dir: daemon_dir.clone(),
@@ -401,9 +419,11 @@ impl Daemon {
401419
daemon_rpc_fallback_addr,
402420
cookie_getter,
403421
signal.clone(),
422+
conn_max_age,
404423
)?),
405424
message_id: Counter::new(),
406425
signal: signal.clone(),
426+
conn_max_age,
407427
rpc_threads: Arc::new(
408428
rayon::ThreadPoolBuilder::new()
409429
.num_threads(daemon_parallelism)
@@ -460,6 +480,7 @@ impl Daemon {
460480
conn: Mutex::new(self.conn.lock().unwrap().reconnect()?),
461481
message_id: Counter::new(),
462482
signal: self.signal.clone(),
483+
conn_max_age: self.conn_max_age,
463484
rpc_threads: self.rpc_threads.clone(),
464485
latency: self.latency.clone(),
465486
size: self.size.clone(),
@@ -505,6 +526,18 @@ impl Daemon {
505526
#[trace]
506527
fn call_jsonrpc(&self, method: &str, request: &Value) -> Result<Value> {
507528
let mut conn = self.conn.lock().unwrap();
529+
// Proactively recycle connections older than the configured max age. Re-establishing
530+
// the TCP connection lets a fronting load balancer (e.g. a Kubernetes ClusterSetIP)
531+
// re-select a backend, so a long-lived connection does not stay pinned to a stale
532+
// endpoint after node rotations. No-op when no max age is configured (the default).
533+
if conn.is_expired() {
534+
debug!(
535+
"recycling daemon RPC connection to {} after {:?}",
536+
conn.addr,
537+
conn.established.elapsed()
538+
);
539+
*conn = conn.reconnect()?;
540+
}
508541
let timer = self.latency.with_label_values(&[method]).start_timer();
509542
let request = request.to_string();
510543
conn.send(&request)?;

0 commit comments

Comments
 (0)