Skip to content

Commit 1130b84

Browse files
committed
agent: add liveness probe component
1 parent 488a057 commit 1130b84

15 files changed

+834
-9
lines changed

agent/config/deepflow-agent.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ controller-ips:
3939
## kubernetes api watcher
4040
#async-worker-thread-number: 16
4141

42+
## Enable HTTP liveness probe endpoint
43+
#liveness-probe-enabled: false
44+
## HTTP liveness probe listen port
45+
#liveness-probe-port: 39090
46+
4247
## Type of agent identifier, choose from [ip-and-mac, ip], defaults to "ip-and-mac"
4348
#agent-unique-identifier: ip-and-mac
4449

agent/src/config/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ pub struct Config {
135135
pub pid_file: String,
136136
pub team_id: String,
137137
pub cgroups_disabled: bool,
138+
pub liveness_probe_enabled: bool,
139+
pub liveness_probe_port: u16,
138140
}
139141

140142
impl Config {
@@ -297,6 +299,8 @@ impl Default for Config {
297299
pid_file: Default::default(),
298300
team_id: "".into(),
299301
cgroups_disabled: false,
302+
liveness_probe_enabled: false,
303+
liveness_probe_port: 39090,
300304
}
301305
}
302306
}
@@ -3975,6 +3979,8 @@ mod tests {
39753979
.expect("failed loading config file");
39763980
assert_eq!(c.controller_ips.len(), 1);
39773981
assert_eq!(&c.controller_ips[0], "127.0.0.1");
3982+
assert!(!c.liveness_probe_enabled);
3983+
assert_eq!(c.liveness_probe_port, 39090);
39783984
}
39793985

39803986
#[test]

agent/src/dispatcher/analyzer_mode_dispatcher.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::{
2020
ops::Add,
2121
sync::{atomic::Ordering, Arc, RwLock},
2222
thread::{self, JoinHandle},
23-
time::Duration,
23+
time::{Duration, Instant},
2424
};
2525

2626
use arc_swap::access::Access;
@@ -47,6 +47,7 @@ use crate::{
4747
},
4848
flow_generator::{flow_map::Config, FlowMap},
4949
handler::{MiniPacket, PacketHandler},
50+
liveness::{DebugInfo, LivenessHandle},
5051
rpc::get_timestamp,
5152
utils::{
5253
bytes::read_u32_be,
@@ -153,6 +154,7 @@ pub(super) struct AnalyzerModeDispatcher {
153154
pub(super) pool_raw_size: usize,
154155
pub(super) flow_generator_thread_handler: Option<JoinHandle<()>>,
155156
pub(super) pipeline_thread_handler: Option<JoinHandle<()>>,
157+
pub(super) flow_generator_liveness: LivenessHandle,
156158
pub(super) queue_debugger: Arc<QueueDebugger>,
157159
pub(super) stats_collector: Arc<stats::Collector>,
158160
pub(super) inner_queue_size: usize,
@@ -270,6 +272,7 @@ impl AnalyzerModeDispatcher {
270272
let collector_config = base.collector_config.clone();
271273
let packet_sequence_output_queue = base.packet_sequence_output_queue.clone(); // Enterprise Edition Feature: packet-sequence
272274
let stats = base.stats.clone();
275+
let liveness = self.flow_generator_liveness.clone();
273276
#[cfg(any(target_os = "linux", target_os = "android"))]
274277
let cpu_set = base.options.lock().unwrap().cpu_set;
275278

@@ -280,6 +283,7 @@ impl AnalyzerModeDispatcher {
280283
let mut timestamp_map: HashMap<CaptureNetworkType, Duration> = HashMap::new();
281284
let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
282285
let mut output_batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
286+
let mut last_liveness = Instant::now();
283287
let mut flow_map = FlowMap::new(
284288
id as u32,
285289
Some(flow_output_queue),
@@ -298,8 +302,13 @@ impl AnalyzerModeDispatcher {
298302
warn!("CPU Affinity({:?}) bind error: {:?}.", &cpu_set, e);
299303
}
300304
}
305+
liveness.start(DebugInfo::new("running"));
301306

302307
while !terminated.load(Ordering::Relaxed) {
308+
if last_liveness.elapsed() >= Duration::from_secs(1) {
309+
liveness.heartbeat(DebugInfo::new("running"));
310+
last_liveness = Instant::now();
311+
}
303312
let config = Config {
304313
flow: &flow_map_config.load(),
305314
log_parser: &log_parser_config.load(),
@@ -433,6 +442,7 @@ impl AnalyzerModeDispatcher {
433442
output_batch.clear();
434443
}
435444
}
445+
liveness.stop(DebugInfo::new("stopped"));
436446
})
437447
.unwrap(),
438448
);
@@ -543,6 +553,8 @@ impl AnalyzerModeDispatcher {
543553
let id = base.id;
544554
let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
545555
let mut allocator = Allocator::new(self.raw_packet_block_size);
556+
let mut last_liveness = Instant::now();
557+
base.liveness_handle.start(DebugInfo::new("running"));
546558
#[cfg(any(target_os = "linux", target_os = "android"))]
547559
let cpu_set = base.options.lock().unwrap().cpu_set;
548560
#[cfg(any(target_os = "linux", target_os = "android"))]
@@ -553,6 +565,10 @@ impl AnalyzerModeDispatcher {
553565
}
554566

555567
while !base.terminated.load(Ordering::Relaxed) {
568+
if last_liveness.elapsed() >= Duration::from_secs(1) {
569+
base.liveness_handle.heartbeat(DebugInfo::new("running"));
570+
last_liveness = Instant::now();
571+
}
556572
if base.reset_whitelist.swap(false, Ordering::Relaxed) {
557573
base.tap_interface_whitelist.reset();
558574
}
@@ -615,6 +631,7 @@ impl AnalyzerModeDispatcher {
615631
let _ = handler.join();
616632
}
617633

634+
base.liveness_handle.stop(DebugInfo::new("stopped"));
618635
self.base.terminate_handler();
619636
info!("Stopped dispatcher {}", self.base.is.log_id);
620637
}

agent/src/dispatcher/base_dispatcher.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ use crate::{
5454
exception::ExceptionHandler,
5555
flow_generator::AppProto,
5656
handler::PacketHandlerBuilder,
57+
liveness::LivenessHandle,
5758
policy::PolicyGetter,
5859
rpc::get_timestamp,
5960
utils::{bytes::read_u16_be, stats::Collector},
@@ -139,6 +140,7 @@ pub(super) struct InternalState {
139140
// dispatcher id for easy debugging
140141
pub log_id: String,
141142
pub promisc_if_indices: Vec<i32>,
143+
pub liveness_handle: LivenessHandle,
142144
}
143145

144146
impl BaseDispatcher {

agent/src/dispatcher/local_mode_dispatcher.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::str;
2121
use std::sync::atomic::Ordering;
2222
#[cfg(target_os = "linux")]
2323
use std::sync::Arc;
24-
use std::time::Duration;
24+
use std::time::{Duration, Instant};
2525

2626
use arc_swap::access::Access;
2727
use log::{debug, info, log_enabled, warn};
@@ -37,6 +37,7 @@ use super::{
3737
error::Result,
3838
TunnelTypeBitmap,
3939
};
40+
use crate::liveness::DebugInfo;
4041

4142
#[cfg(target_os = "linux")]
4243
use crate::platform::{GenericPoller, LibvirtXmlExtractor, Poller};
@@ -201,6 +202,8 @@ impl LocalModeDispatcher {
201202
info!("Start dispatcher {}", base.log_id);
202203
let time_diff = base.ntp_diff.load(Ordering::Relaxed);
203204
let mut prev_timestamp = get_timestamp(time_diff);
205+
let mut last_liveness = Instant::now();
206+
base.liveness_handle.start(DebugInfo::new("running"));
204207
#[cfg(any(target_os = "linux", target_os = "android"))]
205208
let cpu_set = base.options.lock().unwrap().cpu_set;
206209
#[cfg(any(target_os = "linux", target_os = "android"))]
@@ -228,6 +231,10 @@ impl LocalModeDispatcher {
228231
let mut collector_config = base.collector_config.load().clone();
229232

230233
while !base.terminated.load(Ordering::Relaxed) {
234+
if last_liveness.elapsed() >= Duration::from_secs(1) {
235+
base.liveness_handle.heartbeat(DebugInfo::new("running"));
236+
last_liveness = Instant::now();
237+
}
231238
if base.need_reload_config.swap(false, Ordering::Relaxed) {
232239
info!("dispatcher reload config");
233240
flow_config = base.flow_map_config.load().clone();
@@ -299,6 +306,7 @@ impl LocalModeDispatcher {
299306
base.check_and_update_bpf(&mut self.base.engine);
300307
}
301308

309+
base.liveness_handle.stop(DebugInfo::new("stopped"));
302310
self.base.terminate_handler();
303311
info!("Stopped dispatcher {}", self.base.is.log_id);
304312
}

agent/src/dispatcher/local_multins_mode_dispatcher.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
Arc, Mutex,
88
},
99
thread::{self, JoinHandle},
10-
time::Duration,
10+
time::{Duration, Instant},
1111
};
1212

1313
use arc_swap::access::Access;
@@ -33,6 +33,7 @@ use crate::{
3333
config::handler::DispatcherAccess,
3434
exception::ExceptionHandler,
3535
flow_generator::{flow_map::Config, FlowMap},
36+
liveness::{DebugInfo, LivenessHandle},
3637
rpc::get_timestamp,
3738
utils::stats::QueueStats,
3839
};
@@ -44,13 +45,15 @@ pub struct LocalMultinsModeDispatcher {
4445
base: BaseDispatcher,
4546

4647
receiver_manager: Option<JoinHandle<()>>,
48+
receiver_manager_liveness: LivenessHandle,
4749
}
4850

4951
impl LocalMultinsModeDispatcher {
50-
pub fn new(base: BaseDispatcher) -> Self {
52+
pub fn new(base: BaseDispatcher, receiver_manager_liveness: LivenessHandle) -> Self {
5153
Self {
5254
base,
5355
receiver_manager: None,
56+
receiver_manager_liveness,
5457
}
5558
}
5659

@@ -84,6 +87,7 @@ impl LocalMultinsModeDispatcher {
8487
counter: base.counter.clone(),
8588
ntp_diff: base.ntp_diff.clone(),
8689
bpf_controls: bpf_controls.clone(),
90+
liveness: self.receiver_manager_liveness.clone(),
8791
output: packet_input,
8892
};
8993
self.receiver_manager.replace(
@@ -109,9 +113,15 @@ impl LocalMultinsModeDispatcher {
109113
let tunnel_type_trim_bitmap = base.tunnel_type_trim_bitmap.clone();
110114
let mut batch = Vec::with_capacity(PACKET_BATCH_SIZE);
111115
let mut tap_interface_whitelists: HashMap<u64, TapInterfaceWhitelist> = HashMap::new();
116+
let mut last_liveness = Instant::now();
112117

113118
super::set_cpu_affinity(&base.options);
119+
base.liveness_handle.start(DebugInfo::new("running"));
114120
while !base.terminated.load(Ordering::Relaxed) {
121+
if last_liveness.elapsed() >= Duration::from_secs(1) {
122+
base.liveness_handle.heartbeat(DebugInfo::new("running"));
123+
last_liveness = Instant::now();
124+
}
115125
let config = Config {
116126
flow: &base.flow_map_config.load(),
117127
log_parser: &base.log_parser_config.load(),
@@ -225,6 +235,7 @@ impl LocalMultinsModeDispatcher {
225235
});
226236
}
227237
}
238+
base.liveness_handle.stop(DebugInfo::new("stopped"));
228239
info!("Stopping local multi-namespace dispatcher");
229240
info!("Wait for receiver manager to stop");
230241
self.receiver_manager.take().unwrap().join().unwrap();
@@ -513,6 +524,7 @@ struct ReceiverManager {
513524
exception_handler: ExceptionHandler,
514525
counter: Arc<PacketCounter>,
515526
ntp_diff: Arc<AtomicI64>,
527+
liveness: LivenessHandle,
516528

517529
output: DebugSender<Packet>,
518530
}
@@ -551,11 +563,13 @@ impl ReceiverManager {
551563
super::set_cpu_affinity(&self.options);
552564

553565
info!("Receiver manager started");
566+
self.liveness.start(DebugInfo::new("running"));
554567

555568
let mut loop_count = 0;
556569
let mut zombie_threads = vec![];
557570
let mut receiver_threads: HashMap<NsFile, PktReceiverHandle> = HashMap::new();
558571
while !self.terminated.load(Ordering::Relaxed) {
572+
self.liveness.heartbeat(DebugInfo::new("running"));
559573
loop_count = (loop_count + 1) % Self::INTERVAL_SECS;
560574
if loop_count != 1 {
561575
// actual interval is (INTERVAL_SECS - 1) to make this simple and less error prone
@@ -762,6 +776,7 @@ impl ReceiverManager {
762776
let _ = handle.join_handle.take().unwrap().join();
763777
}
764778

779+
self.liveness.stop(DebugInfo::new("stopped"));
765780
info!("Receiver manager stopped");
766781
}
767782
}

agent/src/dispatcher/local_plus_mode_dispatcher.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::process::Command;
2323
use std::str;
2424
use std::sync::{atomic::Ordering, Arc};
2525
use std::thread::{self, JoinHandle};
26-
use std::time::Duration;
26+
use std::time::{Duration, Instant};
2727

2828
use arc_swap::access::Access;
2929
use log::{debug, info, log_enabled, warn};
@@ -49,6 +49,7 @@ use crate::{
4949
config::DispatcherConfig,
5050
flow_generator::{flow_map::Config, FlowMap},
5151
handler::MiniPacket,
52+
liveness::{DebugInfo, LivenessHandle},
5253
rpc::get_timestamp,
5354
utils::{
5455
bytes::read_u16_be,
@@ -73,6 +74,7 @@ pub(super) struct LocalPlusModeDispatcher {
7374
pub(super) stats_collector: Arc<stats::Collector>,
7475
pub(super) flow_generator_thread_handler: Option<JoinHandle<()>>,
7576
pub(super) pipeline_thread_handler: Option<JoinHandle<()>>,
77+
pub(super) flow_generator_liveness: LivenessHandle,
7678
pub(super) inner_queue_size: usize,
7779
pub(super) raw_packet_block_size: usize,
7880
pub(super) pool_raw_size: usize,
@@ -113,6 +115,7 @@ impl LocalPlusModeDispatcher {
113115
let npb_dedup_enabled = base.npb_dedup_enabled.clone();
114116
let pool_raw_size = self.pool_raw_size;
115117
let tunnel_type_trim_bitmap = base.tunnel_type_trim_bitmap.clone();
118+
let liveness = self.flow_generator_liveness.clone();
116119
#[cfg(any(target_os = "linux", target_os = "android"))]
117120
let cpu_set = base.options.lock().unwrap().cpu_set;
118121

@@ -122,6 +125,7 @@ impl LocalPlusModeDispatcher {
122125
.spawn(move || {
123126
let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
124127
let mut output_batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
128+
let mut last_liveness = Instant::now();
125129
let mut flow_map = FlowMap::new(
126130
id as u32,
127131
Some(flow_output_queue),
@@ -140,8 +144,13 @@ impl LocalPlusModeDispatcher {
140144
warn!("CPU Affinity({:?}) bind error: {:?}.", &cpu_set, e);
141145
}
142146
}
147+
liveness.start(DebugInfo::new("running"));
143148

144149
while !terminated.load(Ordering::Relaxed) {
150+
if last_liveness.elapsed() >= Duration::from_secs(1) {
151+
liveness.heartbeat(DebugInfo::new("running"));
152+
last_liveness = Instant::now();
153+
}
145154
let config = Config {
146155
flow: &flow_map_config.load(),
147156
log_parser: &log_parser_config.load(),
@@ -291,6 +300,7 @@ impl LocalPlusModeDispatcher {
291300
output_batch.clear();
292301
}
293302
}
303+
liveness.stop(DebugInfo::new("stopped"));
294304
})
295305
.unwrap(),
296306
);
@@ -385,8 +395,14 @@ impl LocalPlusModeDispatcher {
385395
let id = base.id;
386396
let mut batch = Vec::with_capacity(HANDLER_BATCH_SIZE);
387397
let mut allocator = Allocator::new(self.raw_packet_block_size);
398+
let mut last_liveness = Instant::now();
399+
base.liveness_handle.start(DebugInfo::new("running"));
388400

389401
while !base.terminated.load(Ordering::Relaxed) {
402+
if last_liveness.elapsed() >= Duration::from_secs(1) {
403+
base.liveness_handle.heartbeat(DebugInfo::new("running"));
404+
last_liveness = Instant::now();
405+
}
390406
if base.reset_whitelist.swap(false, Ordering::Relaxed) {
391407
base.tap_interface_whitelist.reset();
392408
}
@@ -450,6 +466,7 @@ impl LocalPlusModeDispatcher {
450466
let _ = handler.join();
451467
}
452468

469+
base.liveness_handle.stop(DebugInfo::new("stopped"));
453470
self.base.terminate_handler();
454471
info!("Stopped dispatcher {}", self.base.is.log_id);
455472
}

0 commit comments

Comments
 (0)