Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions agent/config/deepflow-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ controller-ips:
## kubernetes api watcher
#async-worker-thread-number: 16

## Enable HTTP liveness probe endpoint
#liveness-probe-enabled: true
## HTTP liveness probe listen port
#liveness-probe-port: 39090
Comment thread
kylewanginchina marked this conversation as resolved.

## Type of agent identifier, choose from [ip-and-mac, ip], defaults to "ip-and-mac"
#agent-unique-identifier: ip-and-mac

Expand Down
6 changes: 6 additions & 0 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ pub struct Config {
pub pid_file: String,
pub team_id: String,
pub cgroups_disabled: bool,
pub liveness_probe_enabled: bool,
pub liveness_probe_port: u16,
}

impl Config {
Expand Down Expand Up @@ -297,6 +299,8 @@ impl Default for Config {
pid_file: Default::default(),
team_id: "".into(),
cgroups_disabled: false,
liveness_probe_enabled: true,
liveness_probe_port: 39090,
}
}
}
Expand Down Expand Up @@ -3975,6 +3979,8 @@ mod tests {
.expect("failed loading config file");
assert_eq!(c.controller_ips.len(), 1);
assert_eq!(&c.controller_ips[0], "127.0.0.1");
assert!(c.liveness_probe_enabled);
assert_eq!(c.liveness_probe_port, 39090);
}

#[test]
Expand Down
34 changes: 32 additions & 2 deletions agent/src/dispatcher/analyzer_mode_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::{
},
flow_generator::{flow_map::Config, FlowMap},
handler::{MiniPacket, PacketHandler},
liveness::{self, ComponentId, ComponentSpec, LivenessRegistry},
rpc::get_timestamp,
utils::{
bytes::read_u32_be,
Expand Down Expand Up @@ -153,6 +154,7 @@ pub(super) struct AnalyzerModeDispatcher {
pub(super) pool_raw_size: usize,
pub(super) flow_generator_thread_handler: Option<JoinHandle<()>>,
pub(super) pipeline_thread_handler: Option<JoinHandle<()>>,
pub(super) liveness_registry: Option<LivenessRegistry>,
pub(super) queue_debugger: Arc<QueueDebugger>,
pub(super) stats_collector: Arc<stats::Collector>,
pub(super) inner_queue_size: usize,
Expand Down Expand Up @@ -270,13 +272,23 @@ impl AnalyzerModeDispatcher {
let collector_config = base.collector_config.clone();
let packet_sequence_output_queue = base.packet_sequence_output_queue.clone(); // Enterprise Edition Feature: packet-sequence
let stats = base.stats.clone();
let liveness_registry = self.liveness_registry.clone();
#[cfg(any(target_os = "linux", target_os = "android"))]
let cpu_set = base.options.lock().unwrap().cpu_set;

self.flow_generator_thread_handler.replace(
thread::Builder::new()
.name("dispatcher-packet-to-flow-generator".to_owned())
.spawn(move || {
let liveness = liveness::register(
liveness_registry.as_ref(),
ComponentSpec {
id: ComponentId::new("dispatcher-flow-generator", id as u32),
display_name: "dispatcher analyzer flow generator".into(),
timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS,
..Default::default()
},
);
let mut timestamp_map: HashMap<CaptureNetworkType, Duration> = HashMap::new();
let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
let mut output_batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
Expand All @@ -298,7 +310,6 @@ impl AnalyzerModeDispatcher {
warn!("CPU Affinity({:?}) bind error: {:?}.", &cpu_set, e);
}
}

while !terminated.load(Ordering::Relaxed) {
let config = Config {
flow: &flow_map_config.load(),
Expand All @@ -309,8 +320,11 @@ impl AnalyzerModeDispatcher {
};

match receiver.recv_all(&mut batch, Some(Duration::from_secs(1))) {
Ok(_) => {}
Ok(_) => {
liveness.heartbeat();
}
Err(queue::Error::Timeout) => {
liveness.heartbeat();
flow_map.inject_flush_ticker(&config, Duration::ZERO);
continue;
}
Expand Down Expand Up @@ -535,6 +549,15 @@ impl AnalyzerModeDispatcher {
}

pub(super) fn run(&mut self) {
let liveness_handle = liveness::register(
self.liveness_registry.as_ref(),
ComponentSpec {
id: ComponentId::new("dispatcher", self.base.is.id as u32),
display_name: "dispatcher analyzer".into(),
timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS,
..Default::default()
},
);
let sender_to_parser = self.setup_inner_thread_and_queue();
let base = &mut self.base.is;
info!("Start analyzer dispatcher {}", base.log_id);
Expand All @@ -543,6 +566,7 @@ impl AnalyzerModeDispatcher {
let id = base.id;
let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
let mut allocator = Allocator::new(self.raw_packet_block_size);
let mut last_liveness = Duration::ZERO;
#[cfg(any(target_os = "linux", target_os = "android"))]
let cpu_set = base.options.lock().unwrap().cpu_set;
#[cfg(any(target_os = "linux", target_os = "android"))]
Expand Down Expand Up @@ -574,6 +598,7 @@ impl AnalyzerModeDispatcher {
}
}
if recved.is_none() {
liveness_handle.heartbeat();
if base.tap_interface_whitelist.next_sync(Duration::ZERO) {
base.need_update_bpf.store(true, Ordering::Relaxed);
}
Expand All @@ -586,6 +611,10 @@ impl AnalyzerModeDispatcher {
}

let (packet, timestamp) = recved.unwrap();
if timestamp >= last_liveness + BaseDispatcher::LIVENESS_HEARTBEAT_INTERVAL {
liveness_handle.heartbeat();
last_liveness = timestamp;
}

// From here on, ANALYZER mode is different from LOCAL mode
base.counter.rx.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -615,6 +644,7 @@ impl AnalyzerModeDispatcher {
let _ = handler.join();
}

liveness_handle.pause();
self.base.terminate_handler();
info!("Stopped dispatcher {}", self.base.is.log_id);
}
Expand Down
3 changes: 3 additions & 0 deletions agent/src/dispatcher/base_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ pub(super) struct InternalState {
}

impl BaseDispatcher {
pub(super) const LIVENESS_TIMEOUT_MS: u64 = 60_000;
pub(super) const LIVENESS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);

pub(super) fn prepare_flow(
meta_packet: &mut MetaPacket,
tap_type: CaptureNetworkType,
Expand Down
18 changes: 18 additions & 0 deletions agent/src/dispatcher/local_mode_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::{
config::DispatcherConfig,
flow_generator::{flow_map::Config, FlowMap},
handler::MiniPacket,
liveness::{self, ComponentId, ComponentSpec, LivenessRegistry},
rpc::get_timestamp,
utils::bytes::read_u16_be,
};
Expand All @@ -59,6 +60,7 @@ use public::{

pub(super) struct LocalModeDispatcher {
pub(super) base: BaseDispatcher,
pub(super) liveness_registry: Option<LivenessRegistry>,
#[cfg(target_os = "linux")]
pub(super) extractor: Arc<LibvirtXmlExtractor>,
}
Expand Down Expand Up @@ -197,10 +199,20 @@ impl LocalModeDispatcher {
}

pub(super) fn run(&mut self) {
let liveness_handle = liveness::register(
self.liveness_registry.as_ref(),
ComponentSpec {
id: ComponentId::new("dispatcher", self.base.is.id as u32),
display_name: "dispatcher local".into(),
timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS,
..Default::default()
},
);
let base = &mut self.base.is;
info!("Start dispatcher {}", base.log_id);
let time_diff = base.ntp_diff.load(Ordering::Relaxed);
let mut prev_timestamp = get_timestamp(time_diff);
let mut last_liveness = Duration::ZERO;
#[cfg(any(target_os = "linux", target_os = "android"))]
let cpu_set = base.options.lock().unwrap().cpu_set;
#[cfg(any(target_os = "linux", target_os = "android"))]
Expand Down Expand Up @@ -257,6 +269,7 @@ impl LocalModeDispatcher {
)
};
if recved.is_none() {
liveness_handle.heartbeat();
flow_map.inject_flush_ticker(&config, Duration::ZERO);
if base.tap_interface_whitelist.next_sync(Duration::ZERO) {
base.need_update_bpf.store(true, Ordering::Relaxed);
Expand All @@ -269,6 +282,10 @@ impl LocalModeDispatcher {
continue;
}
let (mut packet, mut timestamp) = recved.unwrap();
if timestamp >= last_liveness + BaseDispatcher::LIVENESS_HEARTBEAT_INTERVAL {
liveness_handle.heartbeat();
last_liveness = timestamp;
}
let Some(meta_packet) = Self::process_packet(
base,
&config,
Expand Down Expand Up @@ -299,6 +316,7 @@ impl LocalModeDispatcher {
base.check_and_update_bpf(&mut self.base.engine);
}

liveness_handle.pause();
self.base.terminate_handler();
info!("Stopped dispatcher {}", self.base.is.log_id);
}
Expand Down
Loading
Loading