Skip to content

Commit 247d1b3

Browse files
rvqlkylewanginchina
authored andcommitted
agent: add watchdog subprocess for non-k8s liveness
1 parent 9229b92 commit 247d1b3

4 files changed

Lines changed: 247 additions & 0 deletions

File tree

agent/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ pub mod rpc;
4040
mod sender;
4141
pub mod trident;
4242
pub mod utils;
43+
#[cfg(unix)]
44+
pub mod watchdog;
4345

4446
// for benchmarks
4547
#[doc(hidden)]

agent/src/main.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,14 @@ struct Opts {
7474
/// Disable cgroups, deepflow-agent will default to checking the CPU and memory resource usage in a loop every 10 seconds to prevent resource usage from exceeding limits.
7575
#[clap(long)]
7676
cgroups_disabled: bool,
77+
78+
#[cfg(unix)]
79+
#[clap(long, hide = true)]
80+
watchdog_parent_pid: Option<u32>,
81+
82+
#[cfg(unix)]
83+
#[clap(long, hide = true)]
84+
watchdog_liveness_url: Option<String>,
7785
}
7886

7987
#[cfg(unix)]
@@ -116,6 +124,15 @@ fn main() -> Result<()> {
116124
println!("{}", VERSION_INFO);
117125
return Ok(());
118126
}
127+
#[cfg(unix)]
128+
if let Some(parent_pid) = opts.watchdog_parent_pid {
129+
return watchdog::run(
130+
parent_pid,
131+
opts.watchdog_liveness_url
132+
.as_deref()
133+
.ok_or_else(|| anyhow::anyhow!("watchdog liveness url is required"))?,
134+
);
135+
}
119136
let mut t = trident::Trident::start(
120137
&Path::new(&opts.config_file),
121138
VERSION_INFO,

agent/src/trident.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ use tokio::runtime::{Builder, Runtime};
4343
use tokio::sync::broadcast;
4444
use zstd::Encoder as ZstdEncoder;
4545

46+
#[cfg(unix)]
47+
use crate::watchdog;
4648
use crate::{
4749
collector::{
4850
flow_aggr::FlowAggrThread, quadruple_generator::QuadrupleGeneratorThread, CollectorThread,
@@ -770,6 +772,25 @@ impl Trident {
770772
Ok::<_, anyhow::Error>(server)
771773
})
772774
.transpose()?;
775+
#[cfg(unix)]
776+
if liveness_server.is_some() && !running_in_k8s() {
777+
let url = watchdog::liveness_url(config_handler.static_config.liveness_probe_port);
778+
match watchdog::spawn(std::process::id(), &url) {
779+
Ok(mut child) => {
780+
let pid = child.id();
781+
info!("spawned watchdog child pid={} for {}", pid, url);
782+
thread::Builder::new()
783+
.name("watchdog-reaper".to_owned())
784+
.spawn(move || {
785+
if let Err(e) = child.wait() {
786+
warn!("watchdog child pid={} wait failed: {}", pid, e);
787+
}
788+
})
789+
.ok();
790+
}
791+
Err(e) => warn!("failed to spawn watchdog child: {}", e),
792+
}
793+
}
773794
let main_loop_liveness = liveness_registry.as_ref();
774795
let main_loop_liveness = liveness::register(
775796
main_loop_liveness,

agent/src/watchdog.rs

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Copyright (c) 2024 Yunshan Networks
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#![cfg(unix)]
18+
19+
use std::{
20+
io::{BufRead, BufReader, Write},
21+
net::{IpAddr, SocketAddr, TcpStream, ToSocketAddrs},
22+
process::{Child, Command},
23+
time::{Duration, Instant},
24+
};
25+
26+
use anyhow::{anyhow, Context, Result};
27+
use http::Uri;
28+
use nix::{
29+
errno::Errno,
30+
sys::signal::{kill, Signal},
31+
unistd::Pid,
32+
};
33+
34+
use crate::utils::environment::get_executable_path;
35+
36+
pub const WATCHDOG_FAILURE_THRESHOLD: u32 = 3;
37+
pub const WATCHDOG_PERIOD: Duration = Duration::from_secs(10);
38+
pub const WATCHDOG_HTTP_TIMEOUT: Duration = Duration::from_secs(3);
39+
pub const WATCHDOG_TERMINATION_GRACE: Duration = Duration::from_secs(10);
40+
41+
pub fn liveness_url(port: u16) -> String {
42+
format!("http://127.0.0.1:{port}/livez")
43+
}
44+
45+
struct ProbeTarget {
46+
addr: SocketAddr,
47+
request: Vec<u8>,
48+
}
49+
50+
impl ProbeTarget {
51+
fn from_url(liveness_url: &str) -> Result<Self> {
52+
let uri: Uri = liveness_url
53+
.parse()
54+
.with_context(|| format!("parse watchdog liveness url {liveness_url} failed"))?;
55+
let addr = parse_addr(&uri)?;
56+
let path = uri.path_and_query().map(|p| p.as_str()).unwrap_or("/livez");
57+
let host = uri.host().unwrap_or("127.0.0.1");
58+
let request = format!("GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n")
59+
.into_bytes();
60+
Ok(Self { addr, request })
61+
}
62+
63+
fn probe_once(&self) -> Result<bool> {
64+
let mut stream = TcpStream::connect_timeout(&self.addr, WATCHDOG_HTTP_TIMEOUT)
65+
.with_context(|| format!("connect {} failed", self.addr))?;
66+
stream.set_read_timeout(Some(WATCHDOG_HTTP_TIMEOUT))?;
67+
stream.set_write_timeout(Some(WATCHDOG_HTTP_TIMEOUT))?;
68+
stream.write_all(&self.request)?;
69+
stream.flush()?;
70+
71+
let mut status_line = String::new();
72+
let mut reader = BufReader::new(stream);
73+
reader.read_line(&mut status_line)?;
74+
let code = status_line
75+
.split_whitespace()
76+
.nth(1)
77+
.ok_or_else(|| anyhow!("invalid HTTP status line: {}", status_line.trim()))?;
78+
Ok(code == "200")
79+
}
80+
}
81+
82+
pub fn run(parent_pid: u32, liveness_url: &str) -> Result<()> {
83+
let parent_pid = Pid::from_raw(parent_pid as i32);
84+
let probe_target = ProbeTarget::from_url(liveness_url)?;
85+
let mut consecutive_failures = 0;
86+
87+
eprintln!(
88+
"[watchdog] monitoring parent pid {} via {}",
89+
parent_pid, liveness_url
90+
);
91+
loop {
92+
if !process_exists(parent_pid)? {
93+
eprintln!(
94+
"[watchdog] parent pid {} exited, watchdog stopping",
95+
parent_pid
96+
);
97+
return Ok(());
98+
}
99+
100+
match probe_target.probe_once() {
101+
Ok(true) => {
102+
consecutive_failures = 0;
103+
}
104+
Ok(false) => {
105+
consecutive_failures += 1;
106+
eprintln!(
107+
"[watchdog] liveness returned unhealthy status for parent pid {}, consecutive_failures={}",
108+
parent_pid, consecutive_failures
109+
);
110+
}
111+
Err(e) => {
112+
consecutive_failures += 1;
113+
eprintln!(
114+
"[watchdog] liveness probe for parent pid {} failed: {}, consecutive_failures={}",
115+
parent_pid, e, consecutive_failures
116+
);
117+
}
118+
}
119+
120+
if consecutive_failures >= WATCHDOG_FAILURE_THRESHOLD {
121+
eprintln!(
122+
"[watchdog] parent pid {} exceeded liveness failure threshold {}, restarting",
123+
parent_pid, WATCHDOG_FAILURE_THRESHOLD
124+
);
125+
return terminate_parent(parent_pid);
126+
}
127+
128+
std::thread::sleep(WATCHDOG_PERIOD);
129+
}
130+
}
131+
132+
pub fn spawn(parent_pid: u32, liveness_url: &str) -> Result<Child> {
133+
let binary = get_executable_path().context("get executable path for watchdog failed")?;
134+
Command::new(binary)
135+
.arg("--watchdog-parent-pid")
136+
.arg(parent_pid.to_string())
137+
.arg("--watchdog-liveness-url")
138+
.arg(liveness_url)
139+
.spawn()
140+
.context("spawn watchdog failed")
141+
}
142+
143+
fn process_exists(pid: Pid) -> Result<bool> {
144+
match kill(pid, None) {
145+
Ok(_) => Ok(true),
146+
Err(Errno::EPERM) => Ok(true),
147+
Err(Errno::ESRCH) => Ok(false),
148+
Err(e) => Err(anyhow!("check parent pid {} failed: {}", pid, e)),
149+
}
150+
}
151+
152+
fn parse_addr(uri: &Uri) -> Result<SocketAddr> {
153+
let host = uri
154+
.host()
155+
.ok_or_else(|| anyhow!("watchdog liveness url missing host"))?;
156+
let port = uri.port_u16().unwrap_or(80);
157+
if let Ok(ip) = host.parse::<IpAddr>() {
158+
return Ok(SocketAddr::new(ip, port));
159+
}
160+
(host, port)
161+
.to_socket_addrs()?
162+
.next()
163+
.ok_or_else(|| anyhow!("resolve watchdog host {host}:{port} failed"))
164+
}
165+
166+
fn terminate_parent(parent_pid: Pid) -> Result<()> {
167+
match kill(parent_pid, Signal::SIGTERM) {
168+
Ok(_) => eprintln!("[watchdog] sent SIGTERM to parent pid {}", parent_pid),
169+
Err(Errno::ESRCH) => return Ok(()),
170+
Err(e) => {
171+
return Err(anyhow!(
172+
"send SIGTERM to parent pid {} failed: {}",
173+
parent_pid,
174+
e
175+
))
176+
}
177+
}
178+
179+
let deadline = Instant::now() + WATCHDOG_TERMINATION_GRACE;
180+
while Instant::now() < deadline {
181+
if !process_exists(parent_pid)? {
182+
eprintln!("[watchdog] parent pid {} exited after SIGTERM", parent_pid);
183+
return Ok(());
184+
}
185+
std::thread::sleep(Duration::from_millis(200));
186+
}
187+
188+
if !process_exists(parent_pid)? {
189+
return Ok(());
190+
}
191+
192+
match kill(parent_pid, Signal::SIGKILL) {
193+
Ok(_) => {
194+
eprintln!(
195+
"[watchdog] parent pid {} did not exit in {:?}, sent SIGKILL",
196+
parent_pid, WATCHDOG_TERMINATION_GRACE
197+
);
198+
Ok(())
199+
}
200+
Err(Errno::ESRCH) => Ok(()),
201+
Err(e) => Err(anyhow!(
202+
"send SIGKILL to parent pid {} failed: {}",
203+
parent_pid,
204+
e
205+
)),
206+
}
207+
}

0 commit comments

Comments
 (0)