@@ -18,6 +18,8 @@ use std::fs::{create_dir_all, rename, File, OpenOptions};
1818use std:: io:: { BufWriter , ErrorKind , Write } ;
1919use std:: marker:: PhantomData ;
2020use std:: net:: { Shutdown , TcpStream } ;
21+ #[ cfg( target_os = "linux" ) ]
22+ use std:: os:: unix:: io:: AsRawFd ;
2123use std:: path:: Path ;
2224use std:: sync:: Mutex ;
2325use std:: sync:: {
@@ -27,6 +29,8 @@ use std::sync::{
2729use std:: thread:: { self , JoinHandle } ;
2830use std:: time:: { Duration , Instant , SystemTime } ;
2931
32+ use socket2:: TcpKeepalive ;
33+
3034use arc_swap:: access:: Access ;
3135use lazy_static:: lazy_static;
3236use log:: { debug, error, info, warn} ;
@@ -433,6 +437,10 @@ impl<T: Sendable> UniformSender<T> {
433437 const TCP_WRITE_TIMEOUT : u64 = 3 ; // s
434438 const QUEUE_READ_TIMEOUT : u64 = 3 ; // s
435439 const DEFAULT_RECONNECT_INTERVAL : u8 = 10 ; // s
440+ const TCP_KEEPALIVE_TIME : u64 = 30 ; // s, idle time before first keepalive probe
441+ const TCP_KEEPALIVE_INTERVAL : u64 = 10 ; // s, interval between keepalive probes
442+ const TCP_KEEPALIVE_RETRIES : u32 = 3 ; // number of keepalive probes before declaring dead
443+ const TCP_USER_TIMEOUT : u64 = 100 ; // s, max time for unacked data before TCP gives up
436444
437445 pub fn new (
438446 id : usize ,
@@ -596,6 +604,37 @@ impl<T: Sendable> UniformSender<T> {
596604 conn. tcp_stream . take ( ) ;
597605 return ;
598606 }
607+ #[ cfg( target_os = "linux" ) ]
608+ {
609+ // Wrap the raw fd temporarily to configure socket options; forget the
610+ // wrapper so it does not close the fd owned by TcpStream.
611+ let sock = unsafe { socket2:: Socket :: from_raw_fd ( tcp_stream. as_raw_fd ( ) ) } ;
612+ // SO_KEEPALIVE: detect idle dead connections (server went away silently).
613+ // After TCP_KEEPALIVE_TIME seconds of idle the kernel sends probes every
614+ // TCP_KEEPALIVE_INTERVAL seconds; TCP_KEEPALIVE_RETRIES failed probes
615+ // close the connection (~60 s total with these defaults).
616+ let keepalive = TcpKeepalive :: new ( )
617+ . with_time ( Duration :: from_secs ( Self :: TCP_KEEPALIVE_TIME ) )
618+ . with_interval ( Duration :: from_secs ( Self :: TCP_KEEPALIVE_INTERVAL ) )
619+ . with_retries ( Self :: TCP_KEEPALIVE_RETRIES ) ;
620+ if let Err ( e) = sock. set_tcp_keepalive ( & keepalive) {
621+ debug ! ( "{} sender tcp stream set keepalive failed {}" , self . name, e) ;
622+ }
623+ // TCP_USER_TIMEOUT: limit how long unacknowledged data may remain in
624+ // flight before the kernel gives up. This caps the TCP retransmission
625+ // storm that would otherwise last ~15 min (tcp_retries2 = 15) when the
626+ // remote end vanishes without sending RST/FIN. 100 s corresponds
627+ // roughly to tcp_retries2 = 5.
628+ if let Err ( e) =
629+ sock. set_tcp_user_timeout ( Some ( Duration :: from_secs ( Self :: TCP_USER_TIMEOUT ) ) )
630+ {
631+ debug ! (
632+ "{} sender tcp stream set user timeout failed {}" ,
633+ self . name, e
634+ ) ;
635+ }
636+ std:: mem:: forget ( sock) ;
637+ }
599638 info ! (
600639 "{} sender tcp connection to {}:{} succeed." ,
601640 self . name, conn. dest_ip, conn. dest_port
0 commit comments