Skip to content

Commit 9c7f90c

Browse files
committed
fix: Agent is unaware of forced Server restarts, causing
retransmission timeout latency of ~15 minutes.
1 parent d435cd1 commit 9c7f90c

File tree

1 file changed

+37
-0
lines changed

1 file changed

+37
-0
lines changed

agent/src/sender/uniform_sender.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use std::sync::{
2727
use std::thread::{self, JoinHandle};
2828
use std::time::{Duration, Instant, SystemTime};
2929

30+
use socket2::TcpKeepalive;
31+
3032
use arc_swap::access::Access;
3133
use lazy_static::lazy_static;
3234
use log::{debug, error, info, warn};
@@ -433,6 +435,10 @@ impl<T: Sendable> UniformSender<T> {
433435
const TCP_WRITE_TIMEOUT: u64 = 3; // s
434436
const QUEUE_READ_TIMEOUT: u64 = 3; // s
435437
const DEFAULT_RECONNECT_INTERVAL: u8 = 10; // s
438+
const TCP_KEEPALIVE_TIME: u64 = 30; // s, idle time before first keepalive probe
439+
const TCP_KEEPALIVE_INTERVAL: u64 = 10; // s, interval between keepalive probes
440+
const TCP_KEEPALIVE_RETRIES: u32 = 3; // number of keepalive probes before declaring dead
441+
const TCP_USER_TIMEOUT: u64 = 100; // s, max time for unacked data before TCP gives up
436442

437443
pub fn new(
438444
id: usize,
@@ -596,6 +602,37 @@ impl<T: Sendable> UniformSender<T> {
596602
conn.tcp_stream.take();
597603
return;
598604
}
605+
#[cfg(target_os = "linux")]
606+
{
607+
// Wrap the raw fd temporarily to configure socket options; forget the
608+
// wrapper so it does not close the fd owned by TcpStream.
609+
let sock = socket2::SockRef::from(&*tcp_stream);
610+
// SO_KEEPALIVE: detect idle dead connections (server went away silently).
611+
// After TCP_KEEPALIVE_TIME seconds of idle the kernel sends probes every
612+
// TCP_KEEPALIVE_INTERVAL seconds; TCP_KEEPALIVE_RETRIES failed probes
613+
// close the connection (~60 s total with these defaults).
614+
let keepalive = TcpKeepalive::new()
615+
.with_time(Duration::from_secs(Self::TCP_KEEPALIVE_TIME))
616+
.with_interval(Duration::from_secs(Self::TCP_KEEPALIVE_INTERVAL))
617+
.with_retries(Self::TCP_KEEPALIVE_RETRIES);
618+
if let Err(e) = sock.set_tcp_keepalive(&keepalive) {
619+
debug!("{} sender tcp stream set keepalive failed {}", self.name, e);
620+
}
621+
// TCP_USER_TIMEOUT: limit how long unacknowledged data may remain in
622+
// flight before the kernel gives up. This caps the TCP retransmission
623+
// storm that would otherwise last ~15 min (tcp_retries2 = 15) when the
624+
// remote end vanishes without sending RST/FIN. 100 s corresponds
625+
// roughly to tcp_retries2 = 5.
626+
if let Err(e) =
627+
sock.set_tcp_user_timeout(Some(Duration::from_secs(Self::TCP_USER_TIMEOUT)))
628+
{
629+
debug!(
630+
"{} sender tcp stream set user timeout failed {}",
631+
self.name, e
632+
);
633+
}
634+
std::mem::forget(sock);
635+
}
599636
info!(
600637
"{} sender tcp connection to {}:{} succeed.",
601638
self.name, conn.dest_ip, conn.dest_port

0 commit comments

Comments
 (0)