Skip to content

Commit ef17235

Browse files
committed
feat(envoy-client): add ping health check
1 parent 0460eb7 commit ef17235

12 files changed

Lines changed: 303 additions & 28 deletions

File tree

Cargo.lock

Lines changed: 15 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
//! Minimal TCP forwarder with a `freeze` mode used by network-fault tests.
2+
//!
3+
//! Toxiproxy can stall traffic but always relays a peer's TCP close to the other side, which
4+
//! defeats tests that need to observe one peer's behavior when it has no signal that the other
5+
//! peer hung up. `FreezeProxy` is a single-purpose forwarder that supports a true black-hole
6+
//! mode: while frozen, bytes are read from each peer and discarded, and an EOF from either peer
7+
//! is held instead of being forwarded.
8+
9+
use std::net::SocketAddr;
10+
use std::sync::Arc;
11+
use std::sync::atomic::{AtomicBool, Ordering};
12+
13+
use anyhow::{Context, Result};
14+
use tokio::io::AsyncReadExt;
15+
use tokio::io::AsyncWriteExt;
16+
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
17+
use tokio::net::{TcpListener, TcpStream};
18+
19+
pub struct FreezeProxy {
20+
listen_addr: SocketAddr,
21+
frozen: Arc<AtomicBool>,
22+
}
23+
24+
impl FreezeProxy {
25+
pub async fn start(upstream: SocketAddr) -> Result<Self> {
26+
let listener = TcpListener::bind("127.0.0.1:0")
27+
.await
28+
.context("failed to bind FreezeProxy listener")?;
29+
let listen_addr = listener
30+
.local_addr()
31+
.context("failed to read FreezeProxy listen addr")?;
32+
let frozen = Arc::new(AtomicBool::new(false));
33+
34+
tokio::spawn({
35+
let frozen = frozen.clone();
36+
async move {
37+
loop {
38+
let (client, _) = match listener.accept().await {
39+
Ok(pair) => pair,
40+
Err(err) => {
41+
tracing::warn!(?err, "freeze proxy accept failed");
42+
return;
43+
}
44+
};
45+
let _ = client.set_nodelay(true);
46+
let frozen = frozen.clone();
47+
tokio::spawn(async move {
48+
let server = match TcpStream::connect(upstream).await {
49+
Ok(stream) => stream,
50+
Err(err) => {
51+
tracing::warn!(?err, %upstream, "freeze proxy upstream connect failed");
52+
return;
53+
}
54+
};
55+
let _ = server.set_nodelay(true);
56+
let (client_r, client_w) = client.into_split();
57+
let (server_r, server_w) = server.into_split();
58+
tokio::spawn(forward(client_r, server_w, frozen.clone()));
59+
tokio::spawn(forward(server_r, client_w, frozen));
60+
});
61+
}
62+
}
63+
});
64+
65+
Ok(Self {
66+
listen_addr,
67+
frozen,
68+
})
69+
}
70+
71+
pub fn endpoint(&self) -> String {
72+
format!("http://{}", self.listen_addr)
73+
}
74+
75+
/// Stops shuttling bytes between the two peers and starts swallowing EOFs so neither peer
76+
/// learns that the other has hung up. Bytes already in flight before this call may still
77+
/// reach the other side.
78+
pub fn freeze(&self) {
79+
self.frozen.store(true, Ordering::SeqCst);
80+
}
81+
}
82+
83+
async fn forward(mut src: OwnedReadHalf, mut dst: OwnedWriteHalf, frozen: Arc<AtomicBool>) {
84+
let mut buf = vec![0u8; 8192];
85+
loop {
86+
match src.read(&mut buf).await {
87+
Ok(0) => {
88+
if frozen.load(Ordering::SeqCst) {
89+
// Hold the destination open: the peer's own send/recv buffer keeps it
90+
// believing the connection is alive, with no FIN ever delivered.
91+
std::future::pending::<()>().await;
92+
}
93+
return;
94+
}
95+
Ok(n) => {
96+
if frozen.load(Ordering::SeqCst) {
97+
// Drain-and-discard so the sender's TCP window stays open and it does not
98+
// notice the link is dead via back-pressure.
99+
continue;
100+
}
101+
if dst.write_all(&buf[..n]).await.is_err() {
102+
return;
103+
}
104+
}
105+
Err(_) => return,
106+
}
107+
}
108+
}

engine/packages/engine/tests/common/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
pub mod actors;
44
pub mod api;
55
pub mod ctx;
6+
pub mod freeze_proxy;
67
pub mod test_envoy;
78
pub mod test_helpers;
89
pub mod test_runner;

engine/packages/engine/tests/common/test_envoy.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,14 @@ impl Envoy {
254254
}
255255
}
256256

257+
pub async fn is_ping_healthy(&self) -> Option<bool> {
258+
self.handle
259+
.lock()
260+
.await
261+
.as_ref()
262+
.map(|handle| handle.is_ping_healthy())
263+
}
264+
257265
pub async fn shutdown(&self) {
258266
if let Some(handle) = self.handle.lock().await.take() {
259267
handle.shutdown_and_wait(false).await;
@@ -299,7 +307,10 @@ impl TestEnvoyCallbacks {
299307

300308
impl rivet_test_envoy::EnvoyCallbacks for TestEnvoyCallbacks {
301309
fn on_connect(&self, _handle: EnvoyHandle) {
302-
let _ = self.inner.connection_tx.send(EnvoyConnectionEvent::Connected);
310+
let _ = self
311+
.inner
312+
.connection_tx
313+
.send(EnvoyConnectionEvent::Connected);
303314
}
304315

305316
fn on_disconnect(&self, _handle: EnvoyHandle) {

engine/packages/engine/tests/envoy/network_faults.rs

Lines changed: 103 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ use super::super::common;
88
#[test]
99
fn envoy_reconnects_after_server_side_tcp_reset() {
1010
common::run(
11-
common::TestOpts::new(1).with_timeout(90).with_network_faults(),
11+
common::TestOpts::new(1)
12+
.with_timeout(90)
13+
.with_network_faults(),
1214
|ctx| async move {
1315
let dc = ctx.leader_dc();
1416
let (namespace, _) = common::setup_test_namespace(dc).await;
@@ -71,8 +73,9 @@ fn envoy_reconnects_after_server_side_tcp_reset() {
7173
// The ping task writes every few seconds in the test config.
7274
disconnect.wait().await;
7375

74-
let reconnect = envoy
75-
.wait_for_next_connection_event(common::test_envoy::EnvoyConnectionEvent::Connected);
76+
let reconnect = envoy.wait_for_next_connection_event(
77+
common::test_envoy::EnvoyConnectionEvent::Connected,
78+
);
7679
envoy_proxy
7780
.clear_toxics()
7881
.await
@@ -90,6 +93,100 @@ fn envoy_reconnects_after_server_side_tcp_reset() {
9093
);
9194
}
9295

96+
#[test]
97+
fn engine_closes_envoy_ws_after_ping_timeout_while_envoy_remains_unaware() {
98+
common::run(
99+
common::TestOpts::new(1).with_timeout(120),
100+
|ctx| async move {
101+
let dc = ctx.leader_dc();
102+
let (namespace, _) = common::setup_test_namespace(dc).await;
103+
104+
// Stand up our own forwarder so we can simulate a true network partition.
105+
// Toxiproxy can stall traffic but always relays a peer's TCP close to the other
106+
// side, which would let envoy-client notice the engine has hung up.
107+
let freeze_proxy = common::freeze_proxy::FreezeProxy::start(
108+
std::net::SocketAddr::from(([127, 0, 0, 1], dc.guard_port())),
109+
)
110+
.await
111+
.expect("failed to start freeze proxy");
112+
113+
let envoy = common::setup_envoy(dc, &namespace, |builder| {
114+
builder
115+
.with_endpoint(freeze_proxy.endpoint())
116+
.with_actor_behavior("network-fault-actor", |_| {
117+
Box::new(
118+
common::test_envoy::CustomActorBuilder::new()
119+
.on_start(|_| {
120+
Box::pin(async {
121+
Ok(common::test_envoy::ActorStartResult::Running)
122+
})
123+
})
124+
.build(),
125+
)
126+
})
127+
})
128+
.await;
129+
130+
let res = common::create_actor(
131+
dc.guard_port(),
132+
&namespace,
133+
"network-fault-actor",
134+
envoy.pool_name(),
135+
rivet_types::actors::CrashPolicy::Sleep,
136+
)
137+
.await;
138+
let actor_id = res.actor.actor_id.to_string();
139+
wait_for_envoy_actor(&envoy, &actor_id).await;
140+
wait_for_connectable(dc.guard_port(), &namespace, &actor_id).await;
141+
142+
let response = common::ping_actor_via_guard(dc, &actor_id).await;
143+
assert_eq!(response["status"], "ok");
144+
145+
// Subscribe before injecting the fault so we can assert no event slips through.
146+
let mut disconnect = envoy.wait_for_next_connection_event(
147+
common::test_envoy::EnvoyConnectionEvent::Disconnected,
148+
);
149+
disconnect.assert_no_event();
150+
151+
// Black-hole the link in both directions. Bytes are read from both peers and
152+
// discarded, and EOFs are swallowed so neither peer's TCP stack ever sees a FIN.
153+
// The engine still keeps sending pings every few seconds (default 3s) but no pongs
154+
// come back, so its application-level ping timeout (default 15s) will eventually
155+
// fire and close the WebSocket. The envoy-client has no application-level
156+
// liveness check of its own, so as long as its TCP socket stays open it continues
157+
// to believe the connection is healthy.
158+
freeze_proxy.freeze();
159+
160+
// Wait well past the engine's 15s ping timeout, then assert the engine did in fact
161+
// tear down its side via the ping task error log...
162+
tokio::time::sleep(std::time::Duration::from_secs(20)).await;
163+
let logs = common::captured_logs_snapshot();
164+
assert!(
165+
logs.contains("ping task failed"),
166+
"expected engine to log a ping timeout, got logs:\n{logs}"
167+
);
168+
169+
// ...and that the envoy-client is still oblivious. The engine's close frame and
170+
// TCP FIN never reach it because the freeze proxy is holding the link open from
171+
// envoy-client's perspective.
172+
disconnect.assert_no_event();
173+
174+
// Even though the envoy-client thinks the WebSocket is alive, its own ping-tracker
175+
// must report unhealthy because no engine ping arrived in the last 20s. This is
176+
// the signal the rivetkit `/health` endpoint uses to ask its host to recycle the
177+
// container.
178+
let healthy = envoy
179+
.is_ping_healthy()
180+
.await
181+
.expect("envoy handle should exist");
182+
assert!(
183+
!healthy,
184+
"envoy-client should report unhealthy after 20s without an engine ping"
185+
);
186+
},
187+
);
188+
}
189+
93190
async fn wait_for_envoy_actor(envoy: &common::test_envoy::TestEnvoy, actor_id: &str) {
94191
common::wait_with_poll(
95192
std::time::Duration::from_secs(5),
@@ -138,7 +235,9 @@ async fn ping_actor_via_gateway(guard_port: u16, actor_id: &str) -> serde_json::
138235
.expect("failed to build reqwest client");
139236

140237
let response = client
141-
.get(format!("http://127.0.0.1:{guard_port}/gateway/{actor_id}/ping"))
238+
.get(format!(
239+
"http://127.0.0.1:{guard_port}/gateway/{actor_id}/ping"
240+
))
142241
.send()
143242
.await
144243
.expect("failed to ping actor through gateway");

engine/packages/test-deps-docker/src/toxiproxy.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,19 @@ impl ToxiproxyProxy {
215215
.await
216216
}
217217

218+
pub async fn timeout_upstream(&self, timeout_ms: u64, toxicity: f32) -> Result<()> {
219+
self.add_toxic(
220+
"timeout-upstream",
221+
"timeout",
222+
ToxiproxyDirection::Upstream,
223+
toxicity,
224+
TimeoutAttributes {
225+
timeout: timeout_ms,
226+
},
227+
)
228+
.await
229+
}
230+
218231
pub async fn bandwidth_downstream(&self, kbps: u64) -> Result<()> {
219232
self.add_toxic(
220233
"bandwidth-downstream",

engine/sdks/rust/envoy-client/src/actor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1660,6 +1660,7 @@ mod tests {
16601660
)),
16611661
protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)),
16621662
shutting_down: std::sync::atomic::AtomicBool::new(false),
1663+
last_ping_ts: std::sync::atomic::AtomicI64::new(rivet_util::timestamp::now()),
16631664
stopped_tx: tokio::sync::watch::channel(true).0,
16641665
});
16651666
(shared, envoy_rx)

engine/sdks/rust/envoy-client/src/context.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::HashMap;
22
use std::sync::Arc;
33
use std::sync::Mutex as StdMutex;
4-
use std::sync::atomic::AtomicBool;
4+
use std::sync::atomic::{AtomicBool, AtomicI64};
55

66
use crate::async_counter::AsyncCounter;
77
use rivet_envoy_protocol as protocol;
@@ -32,6 +32,11 @@ pub struct SharedContext {
3232
pub ws_tx: Arc<Mutex<Option<mpsc::UnboundedSender<WsTxMessage>>>>,
3333
pub protocol_metadata: Arc<Mutex<Option<protocol::ProtocolMetadata>>>,
3434
pub shutting_down: AtomicBool,
35+
/// Epoch ms timestamp of the most recent ping packet received from the engine. Used by
36+
/// `EnvoyHandle::is_ping_healthy` to surface a dead engine link to upstream health checks.
37+
/// Initialized to the construction time so a freshly created envoy reports healthy until
38+
/// its first ping arrives or the threshold elapses without one.
39+
pub last_ping_ts: AtomicI64,
3540
// Latched signal fired by `envoy_loop` after its cleanup block completes.
3641
// Waiters observing `true` are guaranteed that the loop has exited and
3742
// every pending KV/SQLite request has been resolved (with `EnvoyShutdownError`

engine/sdks/rust/envoy-client/src/envoy.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ fn start_envoy_sync_inner(config: EnvoyConfig) -> EnvoyHandle {
306306
ws_tx: Arc::new(tokio::sync::Mutex::new(None)),
307307
protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)),
308308
shutting_down: std::sync::atomic::AtomicBool::new(false),
309+
last_ping_ts: std::sync::atomic::AtomicI64::new(rivet_util::timestamp::now()),
309310
stopped_tx,
310311
});
311312

0 commit comments

Comments
 (0)