diff --git a/agent/config/deepflow-agent.yaml b/agent/config/deepflow-agent.yaml index a3e5cbfd262..5b2b8af3ab3 100644 --- a/agent/config/deepflow-agent.yaml +++ b/agent/config/deepflow-agent.yaml @@ -39,6 +39,11 @@ controller-ips: ## kubernetes api watcher #async-worker-thread-number: 16 +## Enable HTTP liveness probe endpoint +#liveness-probe-enabled: true +## HTTP liveness probe listen port +#liveness-probe-port: 39090 + ## Type of agent identifier, choose from [ip-and-mac, ip], defaults to "ip-and-mac" #agent-unique-identifier: ip-and-mac diff --git a/agent/src/config/config.rs b/agent/src/config/config.rs index 1ad0953c70c..9ab567be624 100644 --- a/agent/src/config/config.rs +++ b/agent/src/config/config.rs @@ -135,6 +135,8 @@ pub struct Config { pub pid_file: String, pub team_id: String, pub cgroups_disabled: bool, + pub liveness_probe_enabled: bool, + pub liveness_probe_port: u16, } impl Config { @@ -297,6 +299,8 @@ impl Default for Config { pid_file: Default::default(), team_id: "".into(), cgroups_disabled: false, + liveness_probe_enabled: true, + liveness_probe_port: 39090, } } } @@ -3975,6 +3979,8 @@ mod tests { .expect("failed loading config file"); assert_eq!(c.controller_ips.len(), 1); assert_eq!(&c.controller_ips[0], "127.0.0.1"); + assert!(c.liveness_probe_enabled); + assert_eq!(c.liveness_probe_port, 39090); } #[test] diff --git a/agent/src/dispatcher/analyzer_mode_dispatcher.rs b/agent/src/dispatcher/analyzer_mode_dispatcher.rs index de30e5a687b..7145106c4e9 100644 --- a/agent/src/dispatcher/analyzer_mode_dispatcher.rs +++ b/agent/src/dispatcher/analyzer_mode_dispatcher.rs @@ -47,6 +47,7 @@ use crate::{ }, flow_generator::{flow_map::Config, FlowMap}, handler::{MiniPacket, PacketHandler}, + liveness::{self, ComponentId, ComponentSpec, LivenessRegistry}, rpc::get_timestamp, utils::{ bytes::read_u32_be, @@ -153,6 +154,7 @@ pub(super) struct AnalyzerModeDispatcher { pub(super) pool_raw_size: usize, pub(super) flow_generator_thread_handler: Option>, pub(super) pipeline_thread_handler: Option>, + pub(super) liveness_registry: Option, pub(super) queue_debugger: Arc, pub(super) stats_collector: Arc, pub(super) inner_queue_size: usize, @@ -270,6 +272,7 @@ impl AnalyzerModeDispatcher { let collector_config = base.collector_config.clone(); let packet_sequence_output_queue = base.packet_sequence_output_queue.clone(); // Enterprise Edition Feature: packet-sequence let stats = base.stats.clone(); + let liveness_registry = self.liveness_registry.clone(); #[cfg(any(target_os = "linux", target_os = "android"))] let cpu_set = base.options.lock().unwrap().cpu_set; @@ -277,6 +280,15 @@ impl AnalyzerModeDispatcher { thread::Builder::new() .name("dispatcher-packet-to-flow-generator".to_owned()) .spawn(move || { + let liveness = liveness::register( + liveness_registry.as_ref(), + ComponentSpec { + id: ComponentId::new("dispatcher-flow-generator", id as u32), + display_name: "dispatcher analyzer flow generator".into(), + timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS, + ..Default::default() + }, + ); let mut timestamp_map: HashMap = HashMap::new(); let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE); let mut output_batch = Vec::with_capacity(HANDLER_BATCH_SIZE); @@ -298,7 +310,6 @@ impl AnalyzerModeDispatcher { warn!("CPU Affinity({:?}) bind error: {:?}.", &cpu_set, e); } } - while !terminated.load(Ordering::Relaxed) { let config = Config { flow: &flow_map_config.load(), @@ -309,8 +320,11 @@ impl AnalyzerModeDispatcher { }; match receiver.recv_all(&mut batch, Some(Duration::from_secs(1))) { - Ok(_) => {} + Ok(_) => { + liveness.heartbeat(); + } Err(queue::Error::Timeout) => { + liveness.heartbeat(); flow_map.inject_flush_ticker(&config, Duration::ZERO); continue; } @@ -535,6 +549,15 @@ impl AnalyzerModeDispatcher { } pub(super) fn run(&mut self) { + let liveness_handle = liveness::register( + self.liveness_registry.as_ref(), + ComponentSpec { + id: ComponentId::new("dispatcher", self.base.is.id as u32), + display_name: "dispatcher analyzer".into(), + timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS, + ..Default::default() + }, + ); let sender_to_parser = self.setup_inner_thread_and_queue(); let base = &mut self.base.is; info!("Start analyzer dispatcher {}", base.log_id); @@ -543,6 +566,7 @@ impl AnalyzerModeDispatcher { let id = base.id; let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE); let mut allocator = Allocator::new(self.raw_packet_block_size); + let mut last_liveness = Duration::ZERO; #[cfg(any(target_os = "linux", target_os = "android"))] let cpu_set = base.options.lock().unwrap().cpu_set; #[cfg(any(target_os = "linux", target_os = "android"))] @@ -574,6 +598,7 @@ impl AnalyzerModeDispatcher { } } if recved.is_none() { + liveness_handle.heartbeat(); if base.tap_interface_whitelist.next_sync(Duration::ZERO) { base.need_update_bpf.store(true, Ordering::Relaxed); } @@ -586,6 +611,10 @@ impl AnalyzerModeDispatcher { } let (packet, timestamp) = recved.unwrap(); + if timestamp >= last_liveness + BaseDispatcher::LIVENESS_HEARTBEAT_INTERVAL { + liveness_handle.heartbeat(); + last_liveness = timestamp; + } // From here on, ANALYZER mode is different from LOCAL mode base.counter.rx.fetch_add(1, Ordering::Relaxed); @@ -615,6 +644,7 @@ impl AnalyzerModeDispatcher { let _ = handler.join(); } + liveness_handle.pause(); self.base.terminate_handler(); info!("Stopped dispatcher {}", self.base.is.log_id); } diff --git a/agent/src/dispatcher/base_dispatcher.rs b/agent/src/dispatcher/base_dispatcher.rs index 19f659495b5..eb867054343 100644 --- a/agent/src/dispatcher/base_dispatcher.rs +++ b/agent/src/dispatcher/base_dispatcher.rs @@ -142,6 +142,9 @@ pub(super) struct InternalState { } impl BaseDispatcher { + pub(super) const LIVENESS_TIMEOUT_MS: u64 = 60_000; + pub(super) const LIVENESS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10); + pub(super) fn prepare_flow( meta_packet: &mut MetaPacket, tap_type: CaptureNetworkType, diff --git a/agent/src/dispatcher/local_mode_dispatcher.rs b/agent/src/dispatcher/local_mode_dispatcher.rs index 44b19224dec..0999a9f2bc7 100644 --- a/agent/src/dispatcher/local_mode_dispatcher.rs +++ b/agent/src/dispatcher/local_mode_dispatcher.rs @@ -49,6 +49,7 @@ use crate::{ config::DispatcherConfig, flow_generator::{flow_map::Config, FlowMap}, handler::MiniPacket, + liveness::{self, ComponentId, ComponentSpec, LivenessRegistry}, rpc::get_timestamp, utils::bytes::read_u16_be, }; @@ -59,6 +60,7 @@ use public::{ pub(super) struct LocalModeDispatcher { pub(super) base: BaseDispatcher, + pub(super) liveness_registry: Option, #[cfg(target_os = "linux")] pub(super) extractor: Arc, } @@ -197,10 +199,20 @@ impl LocalModeDispatcher { } pub(super) fn run(&mut self) { + let liveness_handle = liveness::register( + self.liveness_registry.as_ref(), + ComponentSpec { + id: ComponentId::new("dispatcher", self.base.is.id as u32), + display_name: "dispatcher local".into(), + timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS, + ..Default::default() + }, + ); let base = &mut self.base.is; info!("Start dispatcher {}", base.log_id); let time_diff = base.ntp_diff.load(Ordering::Relaxed); let mut prev_timestamp = get_timestamp(time_diff); + let mut last_liveness = Duration::ZERO; #[cfg(any(target_os = "linux", target_os = "android"))] let cpu_set = base.options.lock().unwrap().cpu_set; #[cfg(any(target_os = "linux", target_os = "android"))] @@ -257,6 +269,7 @@ impl LocalModeDispatcher { ) }; if recved.is_none() { + liveness_handle.heartbeat(); flow_map.inject_flush_ticker(&config, Duration::ZERO); if base.tap_interface_whitelist.next_sync(Duration::ZERO) { base.need_update_bpf.store(true, Ordering::Relaxed); @@ -269,6 +282,10 @@ impl LocalModeDispatcher { continue; } let (mut packet, mut timestamp) = recved.unwrap(); + if timestamp >= last_liveness + BaseDispatcher::LIVENESS_HEARTBEAT_INTERVAL { + liveness_handle.heartbeat(); + last_liveness = timestamp; + } let Some(meta_packet) = Self::process_packet( base, &config, @@ -299,6 +316,7 @@ impl LocalModeDispatcher { base.check_and_update_bpf(&mut self.base.engine); } + liveness_handle.pause(); self.base.terminate_handler(); info!("Stopped dispatcher {}", self.base.is.log_id); } diff --git a/agent/src/dispatcher/local_multins_mode_dispatcher.rs b/agent/src/dispatcher/local_multins_mode_dispatcher.rs index 5a944c98298..467752001c5 100644 --- a/agent/src/dispatcher/local_multins_mode_dispatcher.rs +++ b/agent/src/dispatcher/local_multins_mode_dispatcher.rs @@ -1,4 +1,5 @@ use std::{ + borrow::Cow, collections::{HashMap, HashSet}, ffi::CString, mem, @@ -33,6 +34,7 @@ use crate::{ config::handler::DispatcherAccess, exception::ExceptionHandler, flow_generator::{flow_map::Config, FlowMap}, + liveness::{self, ComponentId, ComponentSpec, LivenessRegistry}, rpc::get_timestamp, utils::stats::QueueStats, }; @@ -41,20 +43,23 @@ const PACKET_BATCH_SIZE: usize = 64; const SETNS_RETRIES: usize = 3; pub struct LocalMultinsModeDispatcher { - base: BaseDispatcher, + pub(super) base: BaseDispatcher, - receiver_manager: Option>, + pub(super) receiver_manager: Option>, + pub(super) liveness_registry: Option, } impl LocalMultinsModeDispatcher { - pub fn new(base: BaseDispatcher) -> Self { - Self { - base, - receiver_manager: None, - } - } - pub fn run(&mut self) { + let liveness_handle = liveness::register( + self.liveness_registry.as_ref(), + ComponentSpec { + id: ComponentId::new("dispatcher", self.base.is.id as u32), + display_name: "dispatcher local multins".into(), + timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS, + ..Default::default() + }, + ); info!("Start local multi-namespace dispatcher"); let base = &mut self.base.is; @@ -84,6 +89,8 @@ impl LocalMultinsModeDispatcher { counter: base.counter.clone(), ntp_diff: base.ntp_diff.clone(), bpf_controls: bpf_controls.clone(), + liveness_registry: self.liveness_registry.clone(), + liveness_id: id as u32, output: packet_input, }; self.receiver_manager.replace( @@ -129,8 +136,11 @@ impl LocalMultinsModeDispatcher { } match packet_output.recv_all(&mut batch, Some(Duration::from_secs(1))) { - Ok(_) => {} + Ok(_) => { + liveness_handle.heartbeat(); + } Err(queue::Error::Timeout) => { + liveness_handle.heartbeat(); flow_map.inject_flush_ticker(&config, Duration::ZERO); let mut bpf_controls = bpf_controls.lock().unwrap(); if base.need_update_bpf.swap(false, Ordering::Relaxed) { @@ -225,6 +235,7 @@ impl LocalMultinsModeDispatcher { }); } } + liveness_handle.pause(); info!("Stopping local multi-namespace dispatcher"); info!("Wait for receiver manager to stop"); self.receiver_manager.take().unwrap().join().unwrap(); @@ -258,6 +269,7 @@ struct BpfControl { struct PktReceiver { pause: Arc, terminated: Arc, + dispatcher_id: u32, netns: NsFile, config: DispatcherAccess, @@ -269,6 +281,7 @@ struct PktReceiver { exception_handler: ExceptionHandler, counter: Arc, ntp_diff: Arc, + liveness_registry: Option, bpf_control: Arc, @@ -276,6 +289,20 @@ struct PktReceiver { } impl PktReceiver { + fn liveness_id(dispatcher_id: u32, ns_ino: u32) -> u32 { + let _ = dispatcher_id; + ns_ino + } + + fn liveness_display_name(netns: &NsFile) -> Cow<'static, str> { + match netns { + NsFile::Root => Cow::Borrowed("dispatcher local multins packet receiver (root)"), + _ => Cow::Owned(format!( + "dispatcher local multins packet receiver ({netns})" + )), + } + } + fn check_and_update_bpf( is_root: bool, log_prefix: &str, @@ -357,7 +384,21 @@ impl PktReceiver { } else { self.netns.get_inode().unwrap() as u32 }; + let is_root = self.netns == NsFile::Root; let log_prefix = format!("pkt-rcv({}):", self.netns); + let liveness = liveness::register( + self.liveness_registry.as_ref(), + ComponentSpec { + id: ComponentId::new( + "dispatcher-local-multins-pkt-receiver", + Self::liveness_id(self.dispatcher_id, ns_ino), + ), + display_name: Self::liveness_display_name(&self.netns), + timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS, + ..Default::default() + }, + ); + let mut last_liveness = Duration::ZERO; // try to setns a few times because this can fail when process terminates for i in 1..=SETNS_RETRIES { let e = match self.netns.open_and_setns() { @@ -440,10 +481,11 @@ impl PktReceiver { } let Some((ref packet, timestamp)) = recved else { + liveness.heartbeat(); drop(recved); if self.bpf_control.need_update.swap(false, Ordering::Relaxed) { Self::check_and_update_bpf( - self.netns == NsFile::Root, + is_root, &log_prefix, &self.bpf_control, &mut engine, @@ -459,6 +501,10 @@ impl PktReceiver { if self.pause.load(Ordering::Relaxed) { continue; } + if timestamp >= last_liveness + BaseDispatcher::LIVENESS_HEARTBEAT_INTERVAL { + liveness.heartbeat(); + last_liveness = timestamp; + } let buffer = allocator.allocate_with(&packet.data); let info = Packet { @@ -474,7 +520,7 @@ impl PktReceiver { drop(recved); if self.bpf_control.need_update.swap(false, Ordering::Relaxed) { Self::check_and_update_bpf( - self.netns == NsFile::Root, + is_root, &log_prefix, &self.bpf_control, &mut engine, @@ -513,6 +559,8 @@ struct ReceiverManager { exception_handler: ExceptionHandler, counter: Arc, ntp_diff: Arc, + liveness_registry: Option, + liveness_id: u32, output: DebugSender, } @@ -551,6 +599,15 @@ impl ReceiverManager { super::set_cpu_affinity(&self.options); info!("Receiver manager started"); + let liveness = liveness::register( + self.liveness_registry.as_ref(), + ComponentSpec { + id: ComponentId::new("dispatcher-receiver-manager", self.liveness_id), + display_name: "dispatcher receiver manager".into(), + timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS, + ..Default::default() + }, + ); let mut loop_count = 0; let mut zombie_threads = vec![]; @@ -563,6 +620,7 @@ impl ReceiverManager { thread::sleep(Duration::from_secs(1)); continue; } + liveness.heartbeat(); // check if pkt receiver threads are running let mut bpf_controls = self.bpf_controls.lock().unwrap(); @@ -659,6 +717,8 @@ impl ReceiverManager { exception_handler: self.exception_handler.clone(), counter: self.counter.clone(), ntp_diff: self.ntp_diff.clone(), + liveness_registry: self.liveness_registry.clone(), + dispatcher_id: self.liveness_id, bpf_control: bpf_control.clone(), output: self.output.clone(), }; diff --git a/agent/src/dispatcher/local_plus_mode_dispatcher.rs b/agent/src/dispatcher/local_plus_mode_dispatcher.rs index 339a0a5b8f1..4630a81e2d7 100644 --- a/agent/src/dispatcher/local_plus_mode_dispatcher.rs +++ b/agent/src/dispatcher/local_plus_mode_dispatcher.rs @@ -49,6 +49,7 @@ use crate::{ config::DispatcherConfig, flow_generator::{flow_map::Config, FlowMap}, handler::MiniPacket, + liveness::{self, ComponentId, ComponentSpec, LivenessRegistry}, rpc::get_timestamp, utils::{ bytes::read_u16_be, @@ -73,6 +74,7 @@ pub(super) struct LocalPlusModeDispatcher { pub(super) stats_collector: Arc, pub(super) flow_generator_thread_handler: Option>, pub(super) pipeline_thread_handler: Option>, + pub(super) liveness_registry: Option, pub(super) inner_queue_size: usize, pub(super) raw_packet_block_size: usize, pub(super) pool_raw_size: usize, @@ -113,6 +115,7 @@ impl LocalPlusModeDispatcher { let npb_dedup_enabled = base.npb_dedup_enabled.clone(); let pool_raw_size = self.pool_raw_size; let tunnel_type_trim_bitmap = base.tunnel_type_trim_bitmap.clone(); + let liveness_registry = self.liveness_registry.clone(); #[cfg(any(target_os = "linux", target_os = "android"))] let cpu_set = base.options.lock().unwrap().cpu_set; @@ -120,6 +123,15 @@ impl LocalPlusModeDispatcher { thread::Builder::new() .name("dispatcher-packet-to-flow-generator".to_owned()) .spawn(move || { + let liveness = liveness::register( + liveness_registry.as_ref(), + ComponentSpec { + id: ComponentId::new("dispatcher-flow-generator", id as u32), + display_name: "dispatcher local plus flow generator".into(), + timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS, + ..Default::default() + }, + ); let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE); let mut output_batch = Vec::with_capacity(HANDLER_BATCH_SIZE); let mut flow_map = FlowMap::new( @@ -140,7 +152,6 @@ impl LocalPlusModeDispatcher { warn!("CPU Affinity({:?}) bind error: {:?}.", &cpu_set, e); } } - while !terminated.load(Ordering::Relaxed) { let config = Config { flow: &flow_map_config.load(), @@ -151,8 +162,11 @@ impl LocalPlusModeDispatcher { }; match receiver.recv_all(&mut batch, Some(Duration::from_secs(1))) { - Ok(_) => {} + Ok(_) => { + liveness.heartbeat(); + } Err(queue::Error::Timeout) => { + liveness.heartbeat(); flow_map.inject_flush_ticker(&config, Duration::ZERO); continue; } @@ -377,6 +391,15 @@ impl LocalPlusModeDispatcher { } pub(super) fn run(&mut self) { + let liveness_handle = liveness::register( + self.liveness_registry.as_ref(), + ComponentSpec { + id: ComponentId::new("dispatcher", self.base.is.id as u32), + display_name: "dispatcher local plus".into(), + timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS, + ..Default::default() + }, + ); let sender_to_parser = self.setup_inner_thread_and_queue(); let base = &mut self.base.is; info!("Start local plus dispatcher {}", base.log_id); @@ -385,7 +408,7 @@ impl LocalPlusModeDispatcher { let id = base.id; let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE); let mut allocator = Allocator::new(self.raw_packet_block_size); - + let mut last_liveness = Duration::ZERO; while !base.terminated.load(Ordering::Relaxed) { if base.reset_whitelist.swap(false, Ordering::Relaxed) { base.tap_interface_whitelist.reset(); @@ -408,6 +431,7 @@ impl LocalPlusModeDispatcher { } } if recved.is_none() { + liveness_handle.heartbeat(); if base.tap_interface_whitelist.next_sync(Duration::ZERO) { base.need_update_bpf.store(true, Ordering::Relaxed); } @@ -420,6 +444,10 @@ impl LocalPlusModeDispatcher { } let (packet, timestamp) = recved.unwrap(); + if timestamp >= last_liveness + BaseDispatcher::LIVENESS_HEARTBEAT_INTERVAL { + liveness_handle.heartbeat(); + last_liveness = timestamp; + } base.counter.rx.fetch_add(1, Ordering::Relaxed); base.counter @@ -450,6 +478,7 @@ impl LocalPlusModeDispatcher { let _ = handler.join(); } + liveness_handle.pause(); self.base.terminate_handler(); info!("Stopped dispatcher {}", self.base.is.log_id); } diff --git a/agent/src/dispatcher/mirror_mode_dispatcher.rs b/agent/src/dispatcher/mirror_mode_dispatcher.rs index 6168eca5286..829c8ab0e41 100644 --- a/agent/src/dispatcher/mirror_mode_dispatcher.rs +++ b/agent/src/dispatcher/mirror_mode_dispatcher.rs @@ -53,6 +53,7 @@ use crate::{ flow_generator::{flow_map::Config, FlowMap}, handler::PacketHandlerBuilder, handler::{MiniPacket, PacketHandler}, + liveness::{self, ComponentId, ComponentSpec, LivenessRegistry}, rpc::get_timestamp, utils::environment::is_tt_hyper_v_compute, }; @@ -418,6 +419,7 @@ pub fn swap_last_timestamp( pub(super) struct MirrorModeDispatcher { pub(super) base: BaseDispatcher, + pub(super) liveness_registry: Option, pub(super) dedup: PacketDedupMap, pub(super) local_vm_mac_set: Arc>>, pub(super) local_segment_macs: Vec, @@ -543,11 +545,20 @@ impl MirrorModeDispatcher { } pub(super) fn run(&mut self) { + let liveness_handle = liveness::register( + self.liveness_registry.as_ref(), + ComponentSpec { + id: ComponentId::new("dispatcher", self.base.is.id as u32), + display_name: "dispatcher mirror".into(), + timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS, + ..Default::default() + }, + ); let base = &mut self.base.is; info!("Start mirror dispatcher {}", base.log_id); let time_diff = base.ntp_diff.load(Ordering::Relaxed); let mut prev_timestamp = get_timestamp(time_diff); - + let mut last_liveness = Duration::ZERO; let mut flow_map = FlowMap::new( base.id as u32, Some(base.flow_output_queue.clone()), @@ -592,6 +603,7 @@ impl MirrorModeDispatcher { ) }; if recved.is_none() { + liveness_handle.heartbeat(); flow_map.inject_flush_ticker(&config, Duration::ZERO); if base.tap_interface_whitelist.next_sync(Duration::ZERO) { base.need_update_bpf.store(true, Ordering::Relaxed); @@ -604,6 +616,10 @@ impl MirrorModeDispatcher { continue; } let (mut packet, mut timestamp) = recved.unwrap(); + if timestamp >= last_liveness + BaseDispatcher::LIVENESS_HEARTBEAT_INTERVAL { + liveness_handle.heartbeat(); + last_liveness = timestamp; + } match swap_last_timestamp( &mut self.last_timestamp_array, @@ -732,6 +748,7 @@ impl MirrorModeDispatcher { base.check_and_update_bpf(&mut self.base.engine); } + liveness_handle.pause(); self.pipelines.clear(); self.base.terminate_handler(); self.last_timestamp_array.clear(); diff --git a/agent/src/dispatcher/mirror_plus_mode_dispatcher.rs b/agent/src/dispatcher/mirror_plus_mode_dispatcher.rs index 4b60a9826ea..171320547a7 100644 --- a/agent/src/dispatcher/mirror_plus_mode_dispatcher.rs +++ b/agent/src/dispatcher/mirror_plus_mode_dispatcher.rs @@ -48,6 +48,7 @@ use crate::{ PacketCounter, }, flow_generator::{flow_map::Config, FlowMap}, + liveness::{self, ComponentId, ComponentSpec, LivenessRegistry}, rpc::get_timestamp, utils::stats::{self, Countable, QueueStats}, }; @@ -217,6 +218,7 @@ pub(super) struct MirrorPlusModeDispatcher { pub(super) agent_type: Arc>, pub(super) mac: u32, pub(super) flow_generator_thread_handler: Option>, + pub(super) liveness_registry: Option, pub(super) queue_debugger: Arc, pub(super) inner_queue_size: usize, pub(super) stats_collector: Arc, @@ -289,6 +291,7 @@ impl MirrorPlusModeDispatcher { let mac = self.mac; let agent_type = self.agent_type.clone(); let local_vm_mac_set = self.local_vm_mac_set.clone(); + let liveness_registry = self.liveness_registry.clone(); #[cfg(any(target_os = "linux", target_os = "android"))] let cpu_set = base.options.lock().unwrap().cpu_set; #[cfg(any(target_os = "linux", target_os = "android"))] @@ -298,6 +301,15 @@ impl MirrorPlusModeDispatcher { thread::Builder::new() .name("dispatcher-packet-to-flow-generator".to_owned()) .spawn(move || { + let liveness = liveness::register( + liveness_registry.as_ref(), + ComponentSpec { + id: ComponentId::new("dispatcher-flow-generator", id as u32), + display_name: "dispatcher mirror plus flow generator".into(), + timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS, + ..Default::default() + }, + ); let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE); let mut flow_map = FlowMap::new( id as u32, @@ -319,7 +331,6 @@ impl MirrorPlusModeDispatcher { warn!("CPU Affinity({:?}) bind error: {:?}.", &cpu_set, e); } } - while !terminated.load(Ordering::Relaxed) { let config = Config { flow: &flow_map_config.load(), @@ -331,8 +342,11 @@ impl MirrorPlusModeDispatcher { let cloud_gateway_traffic = config.flow.cloud_gateway_traffic; match receiver.recv_all(&mut batch, Some(Duration::from_secs(1))) { - Ok(_) => {} + Ok(_) => { + liveness.heartbeat(); + } Err(queue::Error::Timeout) => { + liveness.heartbeat(); flow_map.inject_flush_ticker(&config, Duration::ZERO); continue; } @@ -467,6 +481,15 @@ impl MirrorPlusModeDispatcher { } pub(super) fn run(&mut self) { + let liveness_handle = liveness::register( + self.liveness_registry.as_ref(), + ComponentSpec { + id: ComponentId::new("dispatcher", self.base.is.id as u32), + display_name: "dispatcher mirror plus".into(), + timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS, + ..Default::default() + }, + ); info!("Start mirror plus dispatcher {}", self.base.is.log_id); let sender_to_parser = self.setup_inner_thread_and_queue(); let base = &mut self.base.is; @@ -475,6 +498,7 @@ impl MirrorPlusModeDispatcher { let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE); let id = base.id; let mut allocator = Allocator::new(self.raw_packet_block_size); + let mut last_liveness = Duration::ZERO; #[cfg(any(target_os = "linux", target_os = "android"))] let cpu_set = base.options.lock().unwrap().cpu_set; #[cfg(any(target_os = "linux", target_os = "android"))] @@ -504,6 +528,7 @@ impl MirrorPlusModeDispatcher { } if recved.is_none() { + liveness_handle.heartbeat(); if base.tap_interface_whitelist.next_sync(Duration::ZERO) { base.need_update_bpf.store(true, Ordering::Relaxed); } @@ -516,6 +541,10 @@ impl MirrorPlusModeDispatcher { } let (packet, timestamp) = recved.unwrap(); + if timestamp >= last_liveness + BaseDispatcher::LIVENESS_HEARTBEAT_INTERVAL { + liveness_handle.heartbeat(); + last_liveness = timestamp; + } base.counter.rx.fetch_add(1, Ordering::Relaxed); base.counter @@ -542,6 +571,7 @@ impl MirrorPlusModeDispatcher { if let Some(handler) = self.flow_generator_thread_handler.take() { let _ = handler.join(); } + liveness_handle.pause(); self.base.terminate_handler(); info!("Stopped dispatcher {}", self.base.is.log_id); } diff --git a/agent/src/dispatcher/mod.rs b/agent/src/dispatcher/mod.rs index ce1a791f5de..f929d9fece6 100644 --- a/agent/src/dispatcher/mod.rs +++ b/agent/src/dispatcher/mod.rs @@ -46,7 +46,7 @@ use log::error; use log::{debug, info, warn}; #[cfg(any(target_os = "linux", target_os = "android"))] use nix::sched::CpuSet; -use packet_dedup::*; +use packet_dedup::PacketDedupMap; use public::debug::QueueDebugger; use special_recv_engine::Libpcap; #[cfg(target_os = "linux")] @@ -87,6 +87,7 @@ use crate::{ exception::ExceptionHandler, flow_generator::AppProto, handler::{PacketHandler, PacketHandlerBuilder}, + liveness::LivenessRegistry, policy::PolicyGetter, utils::{ environment::get_mac_by_name, @@ -768,6 +769,7 @@ pub struct DispatcherBuilder { analyzer_raw_packet_block_size: Option, tunnel_type_trim_bitmap: Option, bond_group: Option>, + liveness_registry: Option, } impl DispatcherBuilder { @@ -946,6 +948,11 @@ impl DispatcherBuilder { self } + pub fn liveness_registry(mut self, v: Option) -> Self { + self.liveness_registry = v; + self + } + pub fn build(mut self) -> Result { #[cfg(target_os = "linux")] let netns = self.netns.unwrap_or_default(); @@ -1009,6 +1016,12 @@ impl DispatcherBuilder { .platform_poller .take() .ok_or(Error::ConfigIncomplete("no platform poller".into()))?; + let dispatcher_config = self + .dispatcher_config + .take() + .ok_or(Error::ConfigIncomplete("no dispatcher config".into()))?; + let inner_interface_capture_enabled = + dispatcher_config.load().inner_interface_capture_enabled; let is = InternalState { log_id: { @@ -1092,10 +1105,7 @@ impl DispatcherBuilder { .collector_config .take() .ok_or(Error::ConfigIncomplete("no collector config".into()))?, - dispatcher_config: self - .dispatcher_config - .take() - .ok_or(Error::ConfigIncomplete("no dispatcher config".into()))?, + dispatcher_config, policy_getter: self .policy_getter .ok_or(Error::ConfigIncomplete("no policy".into()))?, @@ -1146,7 +1156,7 @@ impl DispatcherBuilder { stats_collector: collector.clone(), flow_generator_thread_handler: None, pipeline_thread_handler: None, - pool_raw_size: snap_len, + liveness_registry: self.liveness_registry.clone(), inner_queue_size: self .analyzer_queue_size .take() @@ -1154,21 +1164,28 @@ impl DispatcherBuilder { raw_packet_block_size: self.analyzer_raw_packet_block_size.take().ok_or( Error::ConfigIncomplete("no analyzer-raw-packet-block-size".into()), )?, + pool_raw_size: snap_len, }) } else { #[cfg(target_os = "linux")] - if base - .is - .dispatcher_config - .load() - .inner_interface_capture_enabled - { - DispatcherFlavor::LocalMultins(LocalMultinsModeDispatcher::new(base)) + if inner_interface_capture_enabled { + DispatcherFlavor::LocalMultins(LocalMultinsModeDispatcher { + base, + receiver_manager: None, + liveness_registry: self.liveness_registry.clone(), + }) } else { - DispatcherFlavor::Local(LocalModeDispatcher { base, extractor }) + DispatcherFlavor::Local(LocalModeDispatcher { + base, + liveness_registry: self.liveness_registry.clone(), + extractor, + }) } #[cfg(not(target_os = "linux"))] - DispatcherFlavor::Local(LocalModeDispatcher { base }) + DispatcherFlavor::Local(LocalModeDispatcher { + base, + liveness_registry: self.liveness_registry.clone(), + }) } } PacketCaptureType::Mirror => { @@ -1187,6 +1204,7 @@ impl DispatcherBuilder { )), mac: get_mac_by_name(src_interface), flow_generator_thread_handler: None, + liveness_registry: self.liveness_registry.clone(), queue_debugger, inner_queue_size: self .analyzer_queue_size @@ -1200,13 +1218,14 @@ impl DispatcherBuilder { } else { DispatcherFlavor::Mirror(MirrorModeDispatcher { base, + liveness_registry: self.liveness_registry.clone(), dedup: PacketDedupMap::new(), local_vm_mac_set: Arc::new(RwLock::new(HashMap::new())), local_segment_macs: vec![], tap_bridge_macs: vec![], - pipelines: HashMap::new(), #[cfg(target_os = "linux")] poller: Some(platform_poller), + pipelines: HashMap::new(), updated: Arc::new(AtomicBool::new(false)), agent_type: Arc::new(RwLock::new( self.agent_type @@ -1234,8 +1253,9 @@ impl DispatcherBuilder { pool_raw_size: snap_len, flow_generator_thread_handler: None, pipeline_thread_handler: None, - stats_collector: collector.clone(), + liveness_registry: self.liveness_registry.clone(), queue_debugger, + stats_collector: collector.clone(), inner_queue_size: self .analyzer_queue_size .take() diff --git a/agent/src/lib.rs b/agent/src/lib.rs index 8995bdc1dd7..dc5e92da4fa 100644 --- a/agent/src/lib.rs +++ b/agent/src/lib.rs @@ -30,6 +30,7 @@ pub mod exception; pub mod flow_generator; mod handler; mod integration_collector; +mod liveness; mod metric; mod monitor; mod platform; diff --git a/agent/src/liveness.rs b/agent/src/liveness.rs new file mode 100644 index 00000000000..a2fe83a7aec --- /dev/null +++ b/agent/src/liveness.rs @@ -0,0 +1,640 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::{ + borrow::Cow, + collections::HashMap, + convert::Infallible, + panic::Location, + sync::{ + atomic::{AtomicBool, AtomicPtr, AtomicU64, AtomicU8, Ordering}, + Arc, Weak, + }, + time::Instant, +}; + +use hyper::{ + header::CONTENT_TYPE, + service::{make_service_fn, service_fn}, + Body, Method, Request, Response, Server, StatusCode, +}; +use log::{debug, error, info, trace, warn}; +use parking_lot::Mutex; +use serde::Serialize; +use tokio::{runtime::Runtime, sync::oneshot, task::JoinHandle}; + +use crate::trident::VersionInfo; + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize)] +#[repr(u8)] +pub enum LivenessEvent { + #[default] + None = 0, + Pause = 1, + Heartbeat = 2, +} + +impl LivenessEvent { + const fn from_u8(value: u8) -> Option { + match value { + 1 => Some(LivenessEvent::Pause), + 2 => Some(LivenessEvent::Heartbeat), + _ => None, + } + } +} + +const EMPTY_VERSION_INFO: &'static VersionInfo = &VersionInfo { + name: "", + branch: "", + commit_id: "", + rev_count: "", + compiler: "", + compile_time: "", + revision: "", +}; + +#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)] +pub struct ComponentId { + pub module: &'static str, + pub id: u32, +} + +impl ComponentId { + pub const fn new(module: &'static str, id: u32) -> Self { + Self { module, id } + } +} + +#[derive(Clone, Debug)] +pub struct ComponentSpec { + pub id: ComponentId, + pub display_name: Cow<'static, str>, + pub required: bool, + pub timeout_ms: u64, +} + +impl Default for ComponentSpec { + fn default() -> Self { + Self { + id: ComponentId::default(), + display_name: Cow::Borrowed(""), + required: true, + timeout_ms: 0, + } + } +} + +struct ComponentState { + id: ComponentId, + display_name: Cow<'static, str>, + required: bool, + timeout_ms: AtomicU64, + // Monotonically increasing generation distinguishes a fresh registration from a stale handle + // that still exists briefly during component restart/recreation. + generation: u64, + running: AtomicBool, + last_heartbeat_mono_ms: AtomicU64, + last_event: AtomicU8, + // Stores the latest callsite observed through #[track_caller] on heartbeat/pause. + last_location: AtomicPtr>, +} + +impl ComponentState { + fn new(spec: ComponentSpec, generation: u64) -> Self { + Self { + id: spec.id, + display_name: spec.display_name, + required: spec.required, + timeout_ms: AtomicU64::new(spec.timeout_ms), + generation, + running: AtomicBool::new(false), + last_heartbeat_mono_ms: AtomicU64::new(u64::MAX), + last_event: AtomicU8::new(LivenessEvent::None as u8), + last_location: AtomicPtr::new(std::ptr::null_mut()), + } + } + + fn snapshot(&self, now_mono_ms: u64) -> ComponentSnapshot { + let last_heartbeat_mono_ms = self.last_heartbeat_mono_ms.load(Ordering::Relaxed); + let last_event = LivenessEvent::from_u8(self.last_event.load(Ordering::Relaxed)); + let location_ptr = self.last_location.load(Ordering::Relaxed); + let last_location = if location_ptr.is_null() { + None + } else { + // The pointer always originates from &'static Location::caller() + Some(unsafe { &*location_ptr }) + }; + ComponentSnapshot { + module: self.id.module, + id: self.id.id, + display_name: self.display_name.clone(), + running: self.running.load(Ordering::Relaxed), + required: self.required, + timeout_ms: self.timeout_ms.load(Ordering::Relaxed), + last_heartbeat_ago_ms: if last_heartbeat_mono_ms == u64::MAX { + None + } else { + Some(now_mono_ms.saturating_sub(last_heartbeat_mono_ms)) + }, + last_event, + last_location: last_location.map(|loc| SourceLocation { + file: loc.file(), + line: loc.line(), + }), + } + } +} + +struct RegistryInner { + started_at: Instant, + version: &'static VersionInfo, + next_generation: AtomicU64, + components: Mutex>>, +} + +impl RegistryInner { + fn mono_ms(&self) -> u64 { + self.started_at.elapsed().as_millis().min(u64::MAX as u128) as u64 + } + + fn deregister(&self, id: ComponentId, generation: u64) { + let mut components = self.components.lock(); + let should_remove = components + .get(&id) + .map(|state| state.generation == generation) + .unwrap_or(false); + if should_remove { + trace!( + "liveness deregistered component: module={} id={} generation={}", + id.module, + id.id, + generation + ); + components.remove(&id); + } else { + trace!( + "liveness ignored stale deregistration: module={} id={} generation={}", + id.module, + id.id, + generation + ); + } + } +} + +#[derive(Clone)] +pub struct LivenessRegistry { + inner: Arc, +} + +impl Default for LivenessRegistry { + fn default() -> Self { + Self::new(EMPTY_VERSION_INFO) + } +} + +impl LivenessRegistry { + pub fn new(version_info: &'static VersionInfo) -> Self { + Self { + inner: Arc::new(RegistryInner { + started_at: Instant::now(), + version: version_info, + next_generation: AtomicU64::new(1), + components: Mutex::new(HashMap::new()), + }), + } + } + + pub fn register(&self, spec: ComponentSpec) -> LivenessHandle { + let generation = self.inner.next_generation.fetch_add(1, Ordering::Relaxed); + let component_id = spec.id; + let display_name = spec.display_name.clone(); + let required = spec.required; + let timeout_ms = spec.timeout_ms; + let state = Arc::new(ComponentState::new(spec, generation)); + let mut components = self.inner.components.lock(); + if let Some(old) = components.insert(component_id, state.clone()) { + warn!( + "liveness component re-registered: module={} id={} old_display_name={} new_display_name={}", + component_id.module, component_id.id, old.display_name, display_name + ); + } + debug!( + "liveness registered component: module={} id={} display_name={} generation={} required={} timeout_ms={}", + component_id.module, + component_id.id, + display_name, + generation, + required, + timeout_ms + ); + LivenessHandle(Some(Arc::new(HandleInner { + registry: Arc::downgrade(&self.inner), + state, + }))) + } + + pub fn report(&self) -> LivenessReport { + let now_mono_ms = self.inner.mono_ms(); + let mut components = self + .inner + .components + .lock() + .values() + .map(|state| state.snapshot(now_mono_ms)) + .collect::>(); + components.sort_by(|a, b| (&a.module, a.id).cmp(&(&b.module, b.id))); + let failed_components = components + .iter() + .filter(|component| { + component.running + && component.required + && component + .last_heartbeat_ago_ms + .map(|elapsed| elapsed > component.timeout_ms) + .unwrap_or(true) + }) + .cloned() + .collect::>(); + if !failed_components.is_empty() { + debug!( + "liveness report detected {} failed component(s)", + failed_components.len() + ); + } + LivenessReport { + status: if failed_components.is_empty() { + "ok" + } else { + "fail" + }, + version: self.inner.version, + uptime_ms: now_mono_ms, + failed_components, + components, + } + } +} + +pub fn register(registry: Option<&LivenessRegistry>, spec: ComponentSpec) -> LivenessHandle { + registry + .map(|registry| registry.register(spec)) + .unwrap_or_else(LivenessHandle::disabled) +} + +#[derive(Clone, Default)] +pub struct LivenessHandle(Option>); + +struct HandleInner { + registry: Weak, + state: Arc, +} + +impl HandleInner { + // Every state transition funnels through this helper so the report always carries + // a consistent (event, timestamp, callsite) tuple. + fn update( + &self, + running: Option, + event: LivenessEvent, + location: Option<&'static Location<'static>>, + ) { + let Some(registry) = self.registry.upgrade() else { + return; + }; + let now_mono_ms = registry.mono_ms(); + if let Some(running) = running { + self.state.running.store(running, Ordering::Relaxed); + } + self.state + .last_heartbeat_mono_ms + .store(now_mono_ms, Ordering::Relaxed); + self.state.last_event.store(event as u8, Ordering::Relaxed); + self.state.last_location.store( + location + .map(|loc| loc as *const Location<'static> as *mut Location<'static>) + .unwrap_or(std::ptr::null_mut()), + Ordering::Relaxed, + ); + trace!( + "liveness update: module={} id={} event={:?} running={:?}", + self.state.id.module, + self.state.id.id, + event, + running + ); + } +} + +impl Drop for HandleInner { + fn drop(&mut self) { + if let Some(registry) = self.registry.upgrade() { + registry.deregister(self.state.id, self.state.generation); + } + } +} + +impl LivenessHandle { + pub fn disabled() -> Self { + Self(None) + } + + #[track_caller] + pub fn heartbeat(&self) { + if let Some(inner) = self.0.as_ref() { + // The first heartbeat implicitly marks the component as running, which keeps + // long-lived loops and short-lived worker threads on the same API. + inner.update( + Some(true), + LivenessEvent::Heartbeat, + Some(Location::caller()), + ); + } + } + + #[track_caller] + pub fn pause(&self) { + if let Some(inner) = self.0.as_ref() { + inner.update(Some(false), LivenessEvent::Pause, Some(Location::caller())); + } + } + + pub fn set_timeout_ms(&self, timeout_ms: u64) { + if let Some(inner) = self.0.as_ref() { + inner.state.timeout_ms.store(timeout_ms, Ordering::Relaxed); + trace!( + "liveness timeout updated: module={} id={} timeout_ms={}", + inner.state.id.module, + inner.state.id.id, + timeout_ms + ); + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] +pub struct SourceLocation { + pub file: &'static str, + pub line: u32, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub struct ComponentSnapshot { + pub module: &'static str, + pub id: u32, + pub display_name: Cow<'static, str>, + pub running: bool, + pub required: bool, + pub timeout_ms: u64, + pub last_heartbeat_ago_ms: Option, + pub last_event: Option, + pub last_location: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub struct LivenessReport { + pub status: &'static str, + pub version: &'static VersionInfo, + pub uptime_ms: u64, + pub failed_components: Vec, + pub components: Vec, +} + +pub struct LivenessServer { + runtime: Arc, + registry: LivenessRegistry, + port: u16, + running: AtomicBool, + task: Mutex>>, + shutdown_tx: Mutex>>, +} + +impl LivenessServer { + pub fn new(runtime: Arc, registry: LivenessRegistry, port: u16) -> Self { + Self { + runtime, + registry, + port, + running: AtomicBool::new(false), + task: Mutex::new(None), + shutdown_tx: Mutex::new(None), + } + } + + pub fn start(&self) -> Result<(), hyper::Error> { + if self.running.swap(true, Ordering::Relaxed) { + return Ok(()); + } + + let _runtime_guard = self.runtime.enter(); + let addr = ([0, 0, 0, 0], self.port).into(); + let server_builder = match Server::try_bind(&addr) { + Ok(builder) => builder, + Err(e) => { + self.running.store(false, Ordering::Relaxed); + return Err(e); + } + }; + let registry = self.registry.clone(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + *self.shutdown_tx.lock() = Some(shutdown_tx); + self.task.lock().replace(self.runtime.spawn(async move { + let service = make_service_fn(move |_| { + let registry = registry.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |req| { + let registry = registry.clone(); + async move { Ok::<_, Infallible>(Self::handle_request(registry, req)) } + })) + } + }); + + info!("liveness probe listening on http://{addr}/livez"); + let server = server_builder.serve(service).with_graceful_shutdown(async { + let _ = shutdown_rx.await; + }); + if let Err(e) = server.await { + error!("liveness probe server error: {e}"); + } + })); + Ok(()) + } + + pub fn stop(&self) { + if !self.running.swap(false, Ordering::Relaxed) { + return; + } + + if let Some(tx) = self.shutdown_tx.lock().take() { + let _ = tx.send(()); + } + if let Some(task) = self.task.lock().take() { + let _ = self.runtime.block_on(task); + } + } + + fn handle_request(registry: LivenessRegistry, req: Request) -> Response { + match (req.method(), req.uri().path()) { + (&Method::GET, "/livez") => { + let report = registry.report(); + let status = if report.status == "ok" { + StatusCode::OK + } else { + StatusCode::INTERNAL_SERVER_ERROR + }; + match serde_json::to_vec(&report) { + Ok(body) => Response::builder() + .status(status) + .header(CONTENT_TYPE, "application/json") + .body(Body::from(body)) + .unwrap(), + Err(e) => Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(format!( + r#"{{"status":"fail","error":"serialize liveness report failed: {e}"}}"# + ))) + .unwrap(), + } + } + _ => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("Not Found")) + .unwrap(), + } + } +} + +impl Drop for LivenessServer { + fn drop(&mut self) { + self.stop(); + } +} + +#[cfg(test)] +mod tests { + use std::net::TcpListener; + + use hyper::{body::to_bytes, Client, StatusCode}; + use serde_json::Value; + + use super::*; + + const TEST_VERSION_INFO: &VersionInfo = &VersionInfo { + name: "deepflow-agent-ce", + branch: "test", + commit_id: "deadbeef", + rev_count: "1", + compiler: "", + compile_time: "now", + revision: "test 1-deadbeef", + }; + + #[test] + fn duplicate_registration_replaces_state() { + let registry = LivenessRegistry::new(TEST_VERSION_INFO); + let first = registry.register(ComponentSpec { + id: ComponentId::new("test", 1), + display_name: "first".into(), + timeout_ms: 1000, + ..Default::default() + }); + first.heartbeat(); + + let second = registry.register(ComponentSpec { + id: ComponentId::new("test", 1), + display_name: "second".into(), + timeout_ms: 2000, + ..Default::default() + }); + second.heartbeat(); + drop(first); + + let report = registry.report(); + assert_eq!(report.components.len(), 1); + assert_eq!(report.components[0].display_name, "second"); + assert_eq!(report.components[0].timeout_ms, 2000); + } + + #[test] + fn handle_drop_deregisters_component() { + let registry = LivenessRegistry::new(TEST_VERSION_INFO); + let handle = registry.register(ComponentSpec { + id: ComponentId::new("test", 2), + display_name: "test".into(), + timeout_ms: 1000, + ..Default::default() + }); + handle.heartbeat(); + assert_eq!(registry.report().components.len(), 1); + + drop(handle); + + assert!(registry.report().components.is_empty()); + } + + #[test] + fn stopped_component_does_not_fail_liveness() { + let registry = LivenessRegistry::new(TEST_VERSION_INFO); + let handle = registry.register(ComponentSpec { + id: ComponentId::new("test", 3), + display_name: "test".into(), + timeout_ms: 0, + ..Default::default() + }); + handle.heartbeat(); + handle.pause(); + + let report = registry.report(); + assert_eq!(report.status, "ok"); + assert!(!report.components[0].running); + } + + #[test] + fn http_server_returns_liveness_report() { + let runtime = Arc::new(Runtime::new().unwrap()); + let registry = LivenessRegistry::new(TEST_VERSION_INFO); + let handle = registry.register(ComponentSpec { + id: ComponentId::new("test", 4), + display_name: "test".into(), + timeout_ms: 60_000, + ..Default::default() + }); + handle.heartbeat(); + assert_eq!(registry.report().status, "ok"); + + let listener = TcpListener::bind(("127.0.0.1", 0)).unwrap(); + let port = listener.local_addr().unwrap().port(); + drop(listener); + + let server = LivenessServer::new(runtime.clone(), registry.clone(), port); + server.start().unwrap(); + + let client = Client::new(); + let response = runtime.block_on(async move { + client + .get(format!("http://127.0.0.1:{port}/livez").parse().unwrap()) + .await + .unwrap() + }); + assert_eq!(response.status(), StatusCode::OK); + let body = runtime.block_on(async { to_bytes(response.into_body()).await.unwrap() }); + let report: Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(report["status"], "ok"); + assert_eq!(report["version"]["commit_id"], "deadbeef"); + assert_eq!(report["components"].as_array().unwrap().len(), 1); + } +} diff --git a/agent/src/rpc/synchronizer.rs b/agent/src/rpc/synchronizer.rs index e216c07fd93..9c0ab06d2e0 100644 --- a/agent/src/rpc/synchronizer.rs +++ b/agent/src/rpc/synchronizer.rs @@ -71,6 +71,7 @@ use crate::{ }, config::{config, UserConfig}, exception::ExceptionHandler, + liveness::{self, ComponentId, ComponentSpec, LivenessRegistry}, platform, rpc::session::Session, trident::{self, AgentId, AgentState, ChangedConfig, RunningMode, State, VersionInfo}, @@ -612,10 +613,12 @@ pub struct Synchronizer { agent_mode: RunningMode, standalone_runtime_config: Option, ipmac_tx: Arc>, + liveness_registry: Option, } impl Synchronizer { const LOG_THRESHOLD: usize = 3; + const LIVENESS_TIMEOUT_MS: u64 = 90_000; pub fn new( runtime: Arc, @@ -635,6 +638,7 @@ impl Synchronizer { standalone_runtime_config: Option, ipmac_tx: Arc>, ntp_diff: Arc, + liveness_registry: Option, ) -> Synchronizer { Synchronizer { static_config: Arc::new(StaticConfig { @@ -669,6 +673,39 @@ impl Synchronizer { agent_mode, standalone_runtime_config, ipmac_tx, + liveness_registry, + } + } + + fn sync_liveness_spec() -> ComponentSpec { + ComponentSpec { + id: ComponentId::new("synchronizer", 0), + display_name: "synchronizer sync".into(), + // Synchronizer already has escape/restart handling, so liveness here is kept only + // for debugging visibility and should not fail the global probe. + required: false, + timeout_ms: Self::LIVENESS_TIMEOUT_MS, + ..Default::default() + } + } + + fn triggered_liveness_spec() -> ComponentSpec { + ComponentSpec { + id: ComponentId::new("synchronizer", 1), + display_name: "synchronizer triggered".into(), + required: false, + timeout_ms: Self::LIVENESS_TIMEOUT_MS, + ..Default::default() + } + } + + fn standalone_liveness_spec() -> ComponentSpec { + ComponentSpec { + id: ComponentId::new("synchronizer", 2), + display_name: "synchronizer standalone".into(), + required: false, + timeout_ms: Self::LIVENESS_TIMEOUT_MS, + ..Default::default() } } @@ -1311,10 +1348,14 @@ impl Synchronizer { let flow_acl_listener = self.flow_acl_listener.clone(); let exception_handler = self.exception_handler.clone(); let ntp_diff = self.ntp_diff.clone(); + let liveness_registry = self.liveness_registry.clone(); let mut ntp_receiver = ntp_receiver.take().unwrap(); self.threads.lock().push(self.runtime.spawn(async move { + let liveness = + liveness::register(liveness_registry.as_ref(), Self::triggered_liveness_spec()); let mut grpc_failed_count = 0; while running.load(Ordering::SeqCst) { + liveness.heartbeat(); let response = session .grpc_push_with_statsd(Synchronizer::generate_sync_request( &agent_id, @@ -1339,6 +1380,7 @@ impl Synchronizer { let mut stream = response.unwrap().into_inner(); while running.load(Ordering::SeqCst) { + liveness.heartbeat(); let message = stream.message().await; if session.get_version() != version { info!("grpc server or config changed"); @@ -1399,6 +1441,7 @@ impl Synchronizer { } } + liveness.heartbeat(); Self::on_response( session.get_current_server(), message, @@ -1808,8 +1851,12 @@ impl Synchronizer { let mut sync_interval = DEFAULT_SYNC_INTERVAL; let standalone_runtime_config = self.standalone_runtime_config.as_ref().unwrap().clone(); let flow_acl_listener = self.flow_acl_listener.clone(); + let liveness_registry = self.liveness_registry.clone(); self.threads.lock().push(self.runtime.spawn(async move { + let liveness = + liveness::register(liveness_registry.as_ref(), Self::standalone_liveness_spec()); while running.load(Ordering::SeqCst) { + liveness.heartbeat(); let mut user_config = match UserConfig::load_from_file(standalone_runtime_config.as_path()) { Ok(c) => c, @@ -1878,10 +1925,14 @@ impl Synchronizer { let max_memory = self.max_memory.clone(); let exception_handler = self.exception_handler.clone(); let ntp_diff = self.ntp_diff.clone(); + let liveness_registry = self.liveness_registry.clone(); let mut ntp_receiver = ntp_receiver.take().unwrap(); self.threads.lock().push(self.runtime.spawn(async move { + let liveness = + liveness::register(liveness_registry.as_ref(), Self::sync_liveness_spec()); let mut grpc_failed_count = 0; while running.load(Ordering::SeqCst) { + liveness.heartbeat(); let upgrade_hostname = |s: &str| { let r = status.upgradable_read(); if s.ne(&r.hostname) { @@ -1919,6 +1970,7 @@ impl Synchronizer { ); debug!("grpc sync request: {:?}", request); + liveness.heartbeat(); let response = session.grpc_sync_with_statsd(request).await; if let Err(m) = response { let (ip, port) = session.get_current_server(); @@ -1934,6 +1986,7 @@ impl Synchronizer { session.set_request_failed(false); grpc_failed_count = 0; + liveness.heartbeat(); Self::on_response( session.get_current_server(), response.unwrap().into_inner(), diff --git a/agent/src/trident.rs b/agent/src/trident.rs index d7e29c4f836..1bf1c691f1d 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -38,6 +38,7 @@ use flexi_logger::{ }; use log::{debug, error, info, warn}; use num_enum::{FromPrimitive, IntoPrimitive}; +use serde::Serialize; use tokio::runtime::{Builder, Runtime}; use tokio::sync::broadcast; use zstd::Encoder as ZstdEncoder; @@ -78,6 +79,7 @@ use crate::{ ApplicationLog, BoxedPrometheusExtra, Datadog, MetricServer, OpenTelemetry, OpenTelemetryCompressed, Profile, TelegrafMetric, }, + liveness::{self, ComponentId, ComponentSpec, LivenessRegistry, LivenessServer}, metric::document::BoxedDocument, monitor::Monitor, platform::synchronizer::Synchronizer as PlatformSynchronizer, @@ -138,6 +140,8 @@ use public::{netns, packet, queue::Receiver}; const MINUTE: Duration = Duration::from_secs(60); const COMMON_DELAY: u64 = 5; // Potential delay from other processing steps in flow_map +const MAIN_LOOP_LIVENESS_TIMEOUT: Duration = Duration::from_secs(10); +const MAIN_LOOP_COMPONENT_TIMEOUT_MS: u64 = 60_000; const QG_PROCESS_MAX_DELAY: u64 = 5; // FIXME: Potential delay from processing steps in qg, it is an estimated value and is not accurate; the data processing capability of the quadruple_generator should be optimized. #[derive(Debug, Default)] @@ -300,6 +304,7 @@ impl AgentState { } } +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] pub struct VersionInfo { pub name: &'static str, pub branch: &'static str, @@ -747,6 +752,34 @@ impl Trident { .build() .unwrap(), ); + let liveness_registry = config_handler + .static_config + .liveness_probe_enabled + .then(|| LivenessRegistry::new(version_info)); + let liveness_server = liveness_registry + .as_ref() + .map(|registry| { + let server = LivenessServer::new( + runtime.clone(), + registry.clone(), + config_handler.static_config.liveness_probe_port, + ); + server + .start() + .map_err(|e| anyhow!("start liveness probe failed: {e}"))?; + Ok::<_, anyhow::Error>(server) + }) + .transpose()?; + let main_loop_liveness = liveness_registry.as_ref(); + let main_loop_liveness = liveness::register( + main_loop_liveness, + ComponentSpec { + id: ComponentId::new("main-loop", 0), + display_name: "main loop".into(), + timeout_ms: MAIN_LOOP_COMPONENT_TIMEOUT_MS, + ..Default::default() + }, + ); let mut k8s_opaque_id = None; if matches!( @@ -762,7 +795,6 @@ impl Trident { let (ipmac_tx, _) = broadcast::channel::(1); let ipmac_tx = Arc::new(ipmac_tx); - let synchronizer = Arc::new(Synchronizer::new( runtime.clone(), session.clone(), @@ -781,6 +813,7 @@ impl Trident { config_path, ipmac_tx.clone(), ntp_diff, + liveness_registry.clone(), )); stats_collector.register_countable( &stats::NoTagModule("ntp"), @@ -851,6 +884,7 @@ impl Trident { cgroup_mount_path, is_cgroup_v2, cgroups_disabled, + liveness_registry.clone(), ) { Ok(g) => g, Err(e) => { @@ -926,10 +960,12 @@ impl Trident { let mut config_initialized = false; loop { + main_loop_liveness.heartbeat(); let mut state_guard = state.state.lock().unwrap(); if state.terminated.load(Ordering::Relaxed) { mem::drop(state_guard); if let Some(mut c) = components { + main_loop_liveness.heartbeat(); c.stop(); guard.stop(); monitor.stop(); @@ -946,10 +982,21 @@ impl Trident { } } } + main_loop_liveness.pause(); + drop(liveness_server); return Ok(()); } - state_guard = state.notifier.wait(state_guard).unwrap(); + let wait_result = state + .notifier + .wait_timeout(state_guard, MAIN_LOOP_LIVENESS_TIMEOUT) + .unwrap(); + state_guard = wait_result.0; + if wait_result.1.timed_out() { + mem::drop(state_guard); + continue; + } + main_loop_liveness.heartbeat(); match State::from(state_guard.0) { State::Running if state_guard.1.is_none() => { mem::drop(state_guard); @@ -964,6 +1011,7 @@ impl Trident { api_watcher.stop(); } if let Some(ref mut c) = components { + main_loop_liveness.heartbeat(); c.start(); } continue; @@ -976,6 +1024,7 @@ impl Trident { } if let Some(cfg) = new_config { let agent_id = synchronizer.agent_id.read().clone(); + main_loop_liveness.heartbeat(); let callbacks = config_handler.on_config( cfg.user_config, &exception_handler, @@ -989,6 +1038,7 @@ impl Trident { first_run, ); first_run = false; + main_loop_liveness.heartbeat(); #[cfg(target_os = "linux")] if config_handler @@ -1002,6 +1052,7 @@ impl Trident { } if let Some(Components::Agent(c)) = components.as_mut() { + main_loop_liveness.heartbeat(); for callback in callbacks { callback(&config_handler, c); } @@ -1020,6 +1071,7 @@ impl Trident { if !config_initialized { // start guard on receiving first config to ensure // the meltdown thresholds are set by the config + main_loop_liveness.heartbeat(); guard.start(); config_initialized = true; } @@ -1057,6 +1109,7 @@ impl Trident { let agent_id = synchronizer.agent_id.read().clone(); match components.as_mut() { None => { + main_loop_liveness.heartbeat(); let callbacks = config_handler.on_config( user_config, &exception_handler, @@ -1070,6 +1123,7 @@ impl Trident { first_run, ); first_run = false; + main_loop_liveness.heartbeat(); #[cfg(target_os = "linux")] if config_handler @@ -1082,12 +1136,14 @@ impl Trident { api_watcher.stop(); } + main_loop_liveness.heartbeat(); let mut comp = Components::new( &version_info, &config_handler, stats_collector.clone(), &session, &synchronizer, + liveness_registry.clone(), exception_handler.clone(), #[cfg(target_os = "linux")] libvirt_xml_extractor.clone(), @@ -1104,6 +1160,7 @@ impl Trident { ipmac_tx.clone(), )?; + main_loop_liveness.heartbeat(); comp.start(); if let Components::Agent(components) = &mut comp { @@ -1113,6 +1170,7 @@ impl Trident { parse_tap_type(components, tap_types); } + main_loop_liveness.heartbeat(); for callback in callbacks { callback(&config_handler, components); } @@ -1121,6 +1179,7 @@ impl Trident { components.replace(comp); } Some(Components::Agent(components)) => { + main_loop_liveness.heartbeat(); let callbacks: Vec = config_handler .on_config( user_config, @@ -1135,6 +1194,7 @@ impl Trident { first_run, ); first_run = false; + main_loop_liveness.heartbeat(); #[cfg(target_os = "linux")] if config_handler @@ -1148,8 +1208,10 @@ impl Trident { } components.config = config_handler.candidate_config.clone(); + main_loop_liveness.heartbeat(); components.start(); + main_loop_liveness.heartbeat(); component_on_config_change( &config_handler, components, @@ -1161,6 +1223,7 @@ impl Trident { #[cfg(target_os = "linux")] libvirt_xml_extractor.clone(), ); + main_loop_liveness.heartbeat(); for callback in callbacks { callback(&config_handler, components); } @@ -1326,6 +1389,7 @@ fn component_on_config_change( components.toa_info_sender.clone(), components.l4_flow_aggr_sender.clone(), components.metrics_sender.clone(), + components.liveness_registry.clone(), #[cfg(target_os = "linux")] netns::NsFile::Root, #[cfg(target_os = "linux")] @@ -1447,6 +1511,7 @@ fn component_on_config_change( components.toa_info_sender.clone(), components.l4_flow_aggr_sender.clone(), components.metrics_sender.clone(), + components.liveness_registry.clone(), #[cfg(target_os = "linux")] netns::NsFile::Root, #[cfg(target_os = "linux")] @@ -1833,6 +1898,7 @@ pub struct AgentComponents { pub last_dispatcher_component_id: usize, #[cfg(any(target_os = "linux", target_os = "android"))] pub process_listener: Arc, + pub liveness_registry: Option, max_memory: u64, capture_mode: PacketCaptureType, agent_mode: RunningMode, @@ -2119,6 +2185,7 @@ impl AgentComponents { stats_collector: Arc, session: &Arc, synchronizer: &Arc, + liveness_registry: Option, exception_handler: ExceptionHandler, #[cfg(target_os = "linux")] libvirt_xml_extractor: Arc, platform_synchronizer: Arc, @@ -2676,6 +2743,7 @@ impl AgentComponents { toa_sender.clone(), l4_flow_aggr_sender.clone(), metrics_sender.clone(), + liveness_registry.clone(), #[cfg(target_os = "linux")] netns, #[cfg(target_os = "linux")] @@ -3234,6 +3302,7 @@ impl AgentComponents { bpf_options, #[cfg(any(target_os = "linux", target_os = "android"))] process_listener, + liveness_registry, }) } @@ -3438,6 +3507,7 @@ impl Components { stats_collector: Arc, session: &Arc, synchronizer: &Arc, + liveness_registry: Option, exception_handler: ExceptionHandler, #[cfg(target_os = "linux")] libvirt_xml_extractor: Arc, platform_synchronizer: Arc, @@ -3461,6 +3531,7 @@ impl Components { stats_collector, session, synchronizer, + liveness_registry, exception_handler, #[cfg(target_os = "linux")] libvirt_xml_extractor, @@ -3551,6 +3622,7 @@ fn build_dispatchers( toa_info_sender: DebugSender>, l4_flow_aggr_sender: DebugSender, metrics_sender: DebugSender, + liveness_registry: Option, #[cfg(target_os = "linux")] netns: netns::NsFile, #[cfg(target_os = "linux")] kubernetes_poller: Arc, #[cfg(target_os = "linux")] libvirt_xml_extractor: Arc, @@ -3790,6 +3862,7 @@ fn build_dispatchers( .pcap_interfaces(pcap_interfaces.clone()) .tunnel_type_trim_bitmap(dispatcher_config.tunnel_type_trim_bitmap) .bond_group(dispatcher_config.bond_group.clone()) + .liveness_registry(liveness_registry.clone()) .analyzer_raw_packet_block_size( user_config.inputs.cbpf.tunning.raw_packet_buffer_block_size, ); diff --git a/agent/src/utils/guard.rs b/agent/src/utils/guard.rs index 955cb18c50d..6f4f09ba81c 100644 --- a/agent/src/utils/guard.rs +++ b/agent/src/utils/guard.rs @@ -50,6 +50,7 @@ use crate::common::{ }; use crate::config::handler::EnvironmentAccess; use crate::exception::ExceptionHandler; +use crate::liveness::{self, ComponentId, ComponentSpec, LivenessHandle, LivenessRegistry}; use crate::rpc::get_timestamp; use crate::trident::AgentState; use crate::utils::environment::get_disk_usage; @@ -265,9 +266,17 @@ pub struct Guard { system: Arc>, pid: Pid, cgroups_disabled: bool, + liveness: LivenessHandle, } impl Guard { + fn liveness_timeout_ms(guard_interval: Duration) -> u64 { + guard_interval + .as_millis() + .saturating_mul(2) + .min(u64::MAX as u128) as u64 + } + pub fn new( config: EnvironmentAccess, state: Arc, @@ -276,10 +285,20 @@ impl Guard { cgroup_mount_path: String, is_cgroup_v2: bool, cgroups_disabled: bool, + liveness_registry: Option, ) -> Result { let Ok(pid) = get_current_pid() else { return Err("get the process' pid failed: {}, deepflow-agent restart..."); }; + let liveness = liveness::register( + liveness_registry.as_ref(), + ComponentSpec { + id: ComponentId::new("guard", 0), + display_name: "guard".into(), + timeout_ms: Self::liveness_timeout_ms(config.load().guard_interval), + ..Default::default() + }, + ); Ok(Self { config, state, @@ -294,6 +313,7 @@ impl Guard { system: Arc::new(Mutex::new(System::new())), pid, cgroups_disabled, + liveness, }) } @@ -540,9 +560,11 @@ impl Guard { #[cfg(target_os = "linux")] let mut last_page_reclaim = Instant::now(); let feed = Arc::new(Feed::default()); + let mut current_timeout_ms = Self::liveness_timeout_ms(config.load().guard_interval); self.running_watchdog.store(true, Relaxed); self.start_watchdog(feed.clone()); + let liveness = self.liveness.clone(); let thread = thread::Builder::new().name("guard".to_owned()).spawn(move || { let mut system_load = SystemLoadGuard::new(system.clone(), exception_handler.clone()); @@ -551,20 +573,29 @@ impl Guard { let feed = feed.clone(); feed.add(FeedTitle::Init); + liveness.heartbeat(); loop { let config = config.load(); + let timeout_ms = Self::liveness_timeout_ms(config.guard_interval); + if timeout_ms != current_timeout_ms { + liveness.set_timeout_ms(timeout_ms); + current_timeout_ms = timeout_ms; + } let capture_mode = config.capture_mode; let cpu_limit = config.max_millicpus; let mut system_guard = system.lock().unwrap(); feed.add(FeedTitle::SystemGuard); + liveness.heartbeat(); if !system_guard.refresh_process_specifics(pid, ProcessRefreshKind::new().with_cpu()) { warn!("refresh process with cpu failed"); } drop(system_guard); feed.add(FeedTitle::SystemLoad); + liveness.heartbeat(); system_load.check(config.system_load_circuit_breaker_threshold, config.system_load_circuit_breaker_recover, config.system_load_circuit_breaker_metric); feed.add(FeedTitle::FileSize); + liveness.heartbeat(); match get_file_and_size_sum(&log_dir) { Ok(file_and_size_sum) => { let file_sizes_sum = file_and_size_sum.file_sizes_sum; // Total size of current log files (unit: B) @@ -577,6 +608,7 @@ impl Guard { error!("log files' size is over log_file_size_limit, current: {}B, log_file_size_limit: {}B", file_sizes_sum, config.log_file_size); feed.add(FeedTitle::ReleaseLog); + liveness.heartbeat(); Self::release_log_files(file_and_size_sum, config.log_file_size); exception_handler.set(Exception::LogFileExceeded, None); } else { @@ -592,6 +624,7 @@ impl Guard { if cgroups_available && !cgroups_disabled { if check_cgroup_result { feed.add(FeedTitle::CheckCgroups); + liveness.heartbeat(); check_cgroup_result = Self::check_cgroups(cgroup_mount_path.clone(), is_cgroup_v2); if !check_cgroup_result { warn!("check cgroups failed, limit cpu or memory without cgroups"); @@ -599,6 +632,7 @@ impl Guard { } if !check_cgroup_result { feed.add(FeedTitle::CheckCpu1); + liveness.heartbeat(); if !Self::check_cpu(system.clone(), pid.clone(), cpu_limit) { if over_cpu_limit { error!("cpu usage over cpu limit twice, deepflow-agent restart..."); @@ -614,6 +648,7 @@ impl Guard { } } else { feed.add(FeedTitle::CheckCpu2); + liveness.heartbeat(); if !Self::check_cpu(system.clone(), pid.clone(), cpu_limit) { if over_cpu_limit { error!("cpu usage over cpu limit twice, deepflow-agent restart..."); @@ -632,6 +667,7 @@ impl Guard { #[cfg(all(target_os = "linux", target_env = "gnu"))] if config.idle_memory_trimming { feed.add(FeedTitle::MallocTrim); + liveness.heartbeat(); unsafe { let _ = malloc_trim(0); } } @@ -639,6 +675,7 @@ impl Guard { if last_page_reclaim.elapsed() >= Duration::from_secs(60) { last_page_reclaim = Instant::now(); feed.add(FeedTitle::PageCache); + liveness.heartbeat(); let _ = crate::utils::cgroups::page_cache_reclaim_check(config.page_cache_reclaim_percentage); } @@ -651,6 +688,7 @@ impl Guard { let memory_limit = config.max_memory; if memory_limit != 0 { feed.add(FeedTitle::GetMemory); + liveness.heartbeat(); match get_memory_rss() { Ok(memory_usage) => { if memory_usage >= memory_limit { @@ -678,9 +716,11 @@ impl Guard { } feed.add(FeedTitle::SysFree); + liveness.heartbeat(); Self::check_sys_memory(config.sys_memory_limit as f64, config.sys_memory_metric, &mut under_sys_free_memory_limit, &mut last_exceeded, &exception_handler); feed.add(FeedTitle::ThreadNum); + liveness.heartbeat(); match get_thread_num() { Ok(thread_num) => { let thread_limit = config.thread_threshold; @@ -706,11 +746,13 @@ impl Guard { if !in_container { feed.add(FeedTitle::FreeDisk); + liveness.heartbeat(); Self::check_free_disk(config.free_disk_circuit_breaker_percentage_threshold, config.free_disk_circuit_breaker_absolute_threshold, &config.free_disk_circuit_breaker_directories, &exception_handler); } feed.add(FeedTitle::Exception); + liveness.heartbeat(); if exception_handler.has(Exception::SystemLoadCircuitBreaker) { warn!("Set the state to melt_down when the system load exceeds the threshold."); state.melt_down(); @@ -737,6 +779,7 @@ impl Guard { } feed.add(FeedTitle::SocketInfo); + liveness.heartbeat(); #[cfg(target_os = "linux")] match SocketInfo::get() { Ok(SocketInfo { tcp, tcp6, udp, udp6 }) => { @@ -771,17 +814,20 @@ impl Guard { } feed.add(FeedTitle::RunningLock); + liveness.heartbeat(); let (running, notifier) = &*running_state; let mut rg = running.lock().unwrap(); if !*rg { break; } feed.add(FeedTitle::WaitTimeout); + liveness.heartbeat(); rg = notifier.wait_timeout(rg, config.guard_interval).unwrap().0; if !*rg { break; } } + liveness.pause(); info!("guard exited"); }).unwrap();