Skip to content

Commit dc3f931

Browse files
rvqlkylewanginchina
authored andcommitted
agent: add liveness probe component
1 parent c4becf4 commit dc3f931

15 files changed

+1070
-39
lines changed

agent/config/deepflow-agent.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ controller-ips:
3939
## kubernetes api watcher
4040
#async-worker-thread-number: 16
4141

42+
## Enable HTTP liveness probe endpoint
43+
#liveness-probe-enabled: true
44+
## HTTP liveness probe listen port
45+
#liveness-probe-port: 39090
46+
4247
## Type of agent identifier, choose from [ip-and-mac, ip], defaults to "ip-and-mac"
4348
#agent-unique-identifier: ip-and-mac
4449

agent/src/config/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ pub struct Config {
135135
pub pid_file: String,
136136
pub team_id: String,
137137
pub cgroups_disabled: bool,
138+
pub liveness_probe_enabled: bool,
139+
pub liveness_probe_port: u16,
138140
}
139141

140142
impl Config {
@@ -297,6 +299,8 @@ impl Default for Config {
297299
pid_file: Default::default(),
298300
team_id: "".into(),
299301
cgroups_disabled: false,
302+
liveness_probe_enabled: true,
303+
liveness_probe_port: 39090,
300304
}
301305
}
302306
}
@@ -3975,6 +3979,8 @@ mod tests {
39753979
.expect("failed loading config file");
39763980
assert_eq!(c.controller_ips.len(), 1);
39773981
assert_eq!(&c.controller_ips[0], "127.0.0.1");
3982+
assert!(c.liveness_probe_enabled);
3983+
assert_eq!(c.liveness_probe_port, 39090);
39783984
}
39793985

39803986
#[test]

agent/src/dispatcher/analyzer_mode_dispatcher.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use crate::{
4747
},
4848
flow_generator::{flow_map::Config, FlowMap},
4949
handler::{MiniPacket, PacketHandler},
50+
liveness::{self, ComponentId, ComponentSpec, LivenessRegistry},
5051
rpc::get_timestamp,
5152
utils::{
5253
bytes::read_u32_be,
@@ -153,6 +154,7 @@ pub(super) struct AnalyzerModeDispatcher {
153154
pub(super) pool_raw_size: usize,
154155
pub(super) flow_generator_thread_handler: Option<JoinHandle<()>>,
155156
pub(super) pipeline_thread_handler: Option<JoinHandle<()>>,
157+
pub(super) liveness_registry: Option<LivenessRegistry>,
156158
pub(super) queue_debugger: Arc<QueueDebugger>,
157159
pub(super) stats_collector: Arc<stats::Collector>,
158160
pub(super) inner_queue_size: usize,
@@ -270,13 +272,23 @@ impl AnalyzerModeDispatcher {
270272
let collector_config = base.collector_config.clone();
271273
let packet_sequence_output_queue = base.packet_sequence_output_queue.clone(); // Enterprise Edition Feature: packet-sequence
272274
let stats = base.stats.clone();
275+
let liveness_registry = self.liveness_registry.clone();
273276
#[cfg(any(target_os = "linux", target_os = "android"))]
274277
let cpu_set = base.options.lock().unwrap().cpu_set;
275278

276279
self.flow_generator_thread_handler.replace(
277280
thread::Builder::new()
278281
.name("dispatcher-packet-to-flow-generator".to_owned())
279282
.spawn(move || {
283+
let liveness = liveness::register(
284+
liveness_registry.as_ref(),
285+
ComponentSpec {
286+
id: ComponentId::new("dispatcher-flow-generator", id as u32),
287+
display_name: "dispatcher analyzer flow generator".into(),
288+
timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS,
289+
..Default::default()
290+
},
291+
);
280292
let mut timestamp_map: HashMap<CaptureNetworkType, Duration> = HashMap::new();
281293
let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
282294
let mut output_batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
@@ -298,7 +310,6 @@ impl AnalyzerModeDispatcher {
298310
warn!("CPU Affinity({:?}) bind error: {:?}.", &cpu_set, e);
299311
}
300312
}
301-
302313
while !terminated.load(Ordering::Relaxed) {
303314
let config = Config {
304315
flow: &flow_map_config.load(),
@@ -309,8 +320,11 @@ impl AnalyzerModeDispatcher {
309320
};
310321

311322
match receiver.recv_all(&mut batch, Some(Duration::from_secs(1))) {
312-
Ok(_) => {}
323+
Ok(_) => {
324+
liveness.heartbeat();
325+
}
313326
Err(queue::Error::Timeout) => {
327+
liveness.heartbeat();
314328
flow_map.inject_flush_ticker(&config, Duration::ZERO);
315329
continue;
316330
}
@@ -535,6 +549,15 @@ impl AnalyzerModeDispatcher {
535549
}
536550

537551
pub(super) fn run(&mut self) {
552+
let liveness_handle = liveness::register(
553+
self.liveness_registry.as_ref(),
554+
ComponentSpec {
555+
id: ComponentId::new("dispatcher", self.base.is.id as u32),
556+
display_name: "dispatcher analyzer".into(),
557+
timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS,
558+
..Default::default()
559+
},
560+
);
538561
let sender_to_parser = self.setup_inner_thread_and_queue();
539562
let base = &mut self.base.is;
540563
info!("Start analyzer dispatcher {}", base.log_id);
@@ -543,6 +566,7 @@ impl AnalyzerModeDispatcher {
543566
let id = base.id;
544567
let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
545568
let mut allocator = Allocator::new(self.raw_packet_block_size);
569+
let mut last_liveness = Duration::ZERO;
546570
#[cfg(any(target_os = "linux", target_os = "android"))]
547571
let cpu_set = base.options.lock().unwrap().cpu_set;
548572
#[cfg(any(target_os = "linux", target_os = "android"))]
@@ -574,6 +598,7 @@ impl AnalyzerModeDispatcher {
574598
}
575599
}
576600
if recved.is_none() {
601+
liveness_handle.heartbeat();
577602
if base.tap_interface_whitelist.next_sync(Duration::ZERO) {
578603
base.need_update_bpf.store(true, Ordering::Relaxed);
579604
}
@@ -586,6 +611,10 @@ impl AnalyzerModeDispatcher {
586611
}
587612

588613
let (packet, timestamp) = recved.unwrap();
614+
if timestamp >= last_liveness + BaseDispatcher::LIVENESS_HEARTBEAT_INTERVAL {
615+
liveness_handle.heartbeat();
616+
last_liveness = timestamp;
617+
}
589618

590619
// From here on, ANALYZER mode is different from LOCAL mode
591620
base.counter.rx.fetch_add(1, Ordering::Relaxed);
@@ -615,6 +644,7 @@ impl AnalyzerModeDispatcher {
615644
let _ = handler.join();
616645
}
617646

647+
liveness_handle.pause();
618648
self.base.terminate_handler();
619649
info!("Stopped dispatcher {}", self.base.is.log_id);
620650
}

agent/src/dispatcher/base_dispatcher.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ pub(super) struct InternalState {
142142
}
143143

144144
impl BaseDispatcher {
145+
pub(super) const LIVENESS_TIMEOUT_MS: u64 = 60_000;
146+
pub(super) const LIVENESS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);
147+
145148
pub(super) fn prepare_flow(
146149
meta_packet: &mut MetaPacket,
147150
tap_type: CaptureNetworkType,

agent/src/dispatcher/local_mode_dispatcher.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use crate::{
4949
config::DispatcherConfig,
5050
flow_generator::{flow_map::Config, FlowMap},
5151
handler::MiniPacket,
52+
liveness::{self, ComponentId, ComponentSpec, LivenessRegistry},
5253
rpc::get_timestamp,
5354
utils::bytes::read_u16_be,
5455
};
@@ -59,6 +60,7 @@ use public::{
5960

6061
pub(super) struct LocalModeDispatcher {
6162
pub(super) base: BaseDispatcher,
63+
pub(super) liveness_registry: Option<LivenessRegistry>,
6264
#[cfg(target_os = "linux")]
6365
pub(super) extractor: Arc<LibvirtXmlExtractor>,
6466
}
@@ -197,10 +199,20 @@ impl LocalModeDispatcher {
197199
}
198200

199201
pub(super) fn run(&mut self) {
202+
let liveness_handle = liveness::register(
203+
self.liveness_registry.as_ref(),
204+
ComponentSpec {
205+
id: ComponentId::new("dispatcher", self.base.is.id as u32),
206+
display_name: "dispatcher local".into(),
207+
timeout_ms: BaseDispatcher::LIVENESS_TIMEOUT_MS,
208+
..Default::default()
209+
},
210+
);
200211
let base = &mut self.base.is;
201212
info!("Start dispatcher {}", base.log_id);
202213
let time_diff = base.ntp_diff.load(Ordering::Relaxed);
203214
let mut prev_timestamp = get_timestamp(time_diff);
215+
let mut last_liveness = Duration::ZERO;
204216
#[cfg(any(target_os = "linux", target_os = "android"))]
205217
let cpu_set = base.options.lock().unwrap().cpu_set;
206218
#[cfg(any(target_os = "linux", target_os = "android"))]
@@ -257,6 +269,7 @@ impl LocalModeDispatcher {
257269
)
258270
};
259271
if recved.is_none() {
272+
liveness_handle.heartbeat();
260273
flow_map.inject_flush_ticker(&config, Duration::ZERO);
261274
if base.tap_interface_whitelist.next_sync(Duration::ZERO) {
262275
base.need_update_bpf.store(true, Ordering::Relaxed);
@@ -269,6 +282,10 @@ impl LocalModeDispatcher {
269282
continue;
270283
}
271284
let (mut packet, mut timestamp) = recved.unwrap();
285+
if timestamp >= last_liveness + BaseDispatcher::LIVENESS_HEARTBEAT_INTERVAL {
286+
liveness_handle.heartbeat();
287+
last_liveness = timestamp;
288+
}
272289
let Some(meta_packet) = Self::process_packet(
273290
base,
274291
&config,
@@ -299,6 +316,7 @@ impl LocalModeDispatcher {
299316
base.check_and_update_bpf(&mut self.base.engine);
300317
}
301318

319+
liveness_handle.pause();
302320
self.base.terminate_handler();
303321
info!("Stopped dispatcher {}", self.base.is.log_id);
304322
}

0 commit comments

Comments
 (0)