Skip to content

Commit 72cbc7c

Browse files
MasterPtatoNathanFlurry
authored andcommitted
feat(envoy-client): add ping health check
1 parent ea3cb2a commit 72cbc7c

16 files changed

Lines changed: 338 additions & 31 deletions

File tree

CLAUDE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ Design constraints, invariants, and reference commands for the Rivet monorepo. F
2323
**Always use versioned BARE (`vbare`) instead of raw `serde_bare` for any persisted or wire-format encoding unless explicitly told otherwise.** Raw `serde_bare::to_vec` / `from_slice` has no version header, so any future schema change forces hand-rolled `LegacyXxx` fallback structs. `vbare::OwnedVersionedData` plus a versioned `*.bare` schema is the standard pattern. Acceptable raw-bare exceptions: ephemeral in-memory encodings that never cross a process boundary or hit disk, and wire formats whose protocol version is coordinated out-of-band (e.g. an HTTP path like `/v{PROTOCOL_VERSION}/...` or another channel that pins both peers to one schema per call).
2424

2525
- Avoid raw `f64` fields in vbare protocol schemas that use hashable maps; generated Rust derives `Eq`/`Hash`, so encode floats as fixed bytes or an ordered wrapper.
26-
- Version converters must manually map fields between versions; never convert by serializing one version and deserializing it as another.
26+
- Version converters must manually map fields between versions; never use serialize-deserialize round trips such as `transcode_version` or `serde_bare::to_vec` plus `from_slice`.
2727

2828
When talking about "Rivet Actors" make sure to capitalize "Rivet Actor" as a proper noun and lowercase "actor" as a generic noun.
2929

Cargo.lock

Lines changed: 15 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
//! Minimal TCP forwarder with a `freeze` mode used by network-fault tests.
2+
//!
3+
//! Toxiproxy can stall traffic but always relays a peer's TCP close to the other side, which
4+
//! defeats tests that need to observe one peer's behavior when it has no signal that the other
5+
//! peer hung up. `FreezeProxy` is a single-purpose forwarder that supports a true black-hole
6+
//! mode: while frozen, bytes are read from each peer and discarded, and an EOF from either peer
7+
//! is held instead of being forwarded.
8+
9+
use std::net::SocketAddr;
10+
use std::sync::Arc;
11+
use std::sync::atomic::{AtomicBool, Ordering};
12+
13+
use anyhow::{Context, Result};
14+
use tokio::io::AsyncReadExt;
15+
use tokio::io::AsyncWriteExt;
16+
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
17+
use tokio::net::{TcpListener, TcpStream};
18+
19+
pub struct FreezeProxy {
20+
listen_addr: SocketAddr,
21+
frozen: Arc<AtomicBool>,
22+
}
23+
24+
impl FreezeProxy {
25+
pub async fn start(upstream: SocketAddr) -> Result<Self> {
26+
let listener = TcpListener::bind("127.0.0.1:0")
27+
.await
28+
.context("failed to bind FreezeProxy listener")?;
29+
let listen_addr = listener
30+
.local_addr()
31+
.context("failed to read FreezeProxy listen addr")?;
32+
let frozen = Arc::new(AtomicBool::new(false));
33+
34+
tokio::spawn({
35+
let frozen = frozen.clone();
36+
async move {
37+
loop {
38+
let (client, _) = match listener.accept().await {
39+
Ok(pair) => pair,
40+
Err(err) => {
41+
tracing::warn!(?err, "freeze proxy accept failed");
42+
return;
43+
}
44+
};
45+
let _ = client.set_nodelay(true);
46+
let frozen = frozen.clone();
47+
tokio::spawn(async move {
48+
let server = match TcpStream::connect(upstream).await {
49+
Ok(stream) => stream,
50+
Err(err) => {
51+
tracing::warn!(?err, %upstream, "freeze proxy upstream connect failed");
52+
return;
53+
}
54+
};
55+
let _ = server.set_nodelay(true);
56+
let (client_r, client_w) = client.into_split();
57+
let (server_r, server_w) = server.into_split();
58+
tokio::spawn(forward(client_r, server_w, frozen.clone()));
59+
tokio::spawn(forward(server_r, client_w, frozen));
60+
});
61+
}
62+
}
63+
});
64+
65+
Ok(Self {
66+
listen_addr,
67+
frozen,
68+
})
69+
}
70+
71+
pub fn endpoint(&self) -> String {
72+
format!("http://{}", self.listen_addr)
73+
}
74+
75+
/// Stops shuttling bytes between the two peers and starts swallowing EOFs so neither peer
76+
/// learns that the other has hung up. Bytes already in flight before this call may still
77+
/// reach the other side.
78+
pub fn freeze(&self) {
79+
self.frozen.store(true, Ordering::SeqCst);
80+
}
81+
}
82+
83+
async fn forward(mut src: OwnedReadHalf, mut dst: OwnedWriteHalf, frozen: Arc<AtomicBool>) {
84+
let mut buf = vec![0u8; 8192];
85+
loop {
86+
match src.read(&mut buf).await {
87+
Ok(0) => {
88+
if frozen.load(Ordering::SeqCst) {
89+
// Hold the destination open: the peer's own send/recv buffer keeps it
90+
// believing the connection is alive, with no FIN ever delivered.
91+
std::future::pending::<()>().await;
92+
}
93+
return;
94+
}
95+
Ok(n) => {
96+
if frozen.load(Ordering::SeqCst) {
97+
// Drain-and-discard so the sender's TCP window stays open and it does not
98+
// notice the link is dead via back-pressure.
99+
continue;
100+
}
101+
if dst.write_all(&buf[..n]).await.is_err() {
102+
return;
103+
}
104+
}
105+
Err(_) => return,
106+
}
107+
}
108+
}

engine/packages/engine/tests/common/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
pub mod actors;
44
pub mod api;
55
pub mod ctx;
6+
pub mod freeze_proxy;
67
pub mod test_envoy;
78
pub mod test_helpers;
89
pub mod test_runner;

engine/packages/engine/tests/common/test_envoy.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,14 @@ impl Envoy {
254254
}
255255
}
256256

257+
pub async fn is_ping_healthy(&self) -> Option<bool> {
258+
self.handle
259+
.lock()
260+
.await
261+
.as_ref()
262+
.map(|handle| handle.is_ping_healthy())
263+
}
264+
257265
pub async fn shutdown(&self) {
258266
if let Some(handle) = self.handle.lock().await.take() {
259267
handle.shutdown_and_wait(false).await;
@@ -299,7 +307,10 @@ impl TestEnvoyCallbacks {
299307

300308
impl rivet_test_envoy::EnvoyCallbacks for TestEnvoyCallbacks {
301309
fn on_connect(&self, _handle: EnvoyHandle) {
302-
let _ = self.inner.connection_tx.send(EnvoyConnectionEvent::Connected);
310+
let _ = self
311+
.inner
312+
.connection_tx
313+
.send(EnvoyConnectionEvent::Connected);
303314
}
304315

305316
fn on_disconnect(&self, _handle: EnvoyHandle) {

engine/packages/engine/tests/envoy/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,4 @@ pub mod api_actors_get_or_create;
1111
pub mod api_actors_list;
1212
pub mod api_actors_list_names;
1313
pub mod auth;
14-
pub mod auth;
1514
pub mod network_faults;

engine/packages/engine/tests/envoy/network_faults.rs

Lines changed: 117 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,17 @@ use std::sync::{
55

66
use super::super::common;
77

8+
const ENVOY_PING_HEALTHY_THRESHOLD: std::time::Duration = std::time::Duration::from_millis(
9+
common::test_envoy::EnvoyHandle::PING_HEALTHY_THRESHOLD_MS as u64,
10+
);
11+
const ENVOY_PING_INTERVAL_MARGIN: std::time::Duration = std::time::Duration::from_secs(5);
12+
813
#[test]
914
fn envoy_reconnects_after_server_side_tcp_reset() {
1015
common::run(
11-
common::TestOpts::new(1).with_timeout(90).with_network_faults(),
16+
common::TestOpts::new(1)
17+
.with_timeout(90)
18+
.with_network_faults(),
1219
|ctx| async move {
1320
let dc = ctx.leader_dc();
1421
let (namespace, _) = common::setup_test_namespace(dc).await;
@@ -55,7 +62,7 @@ fn envoy_reconnects_after_server_side_tcp_reset() {
5562
wait_for_envoy_actor(&envoy, &actor_id).await;
5663
wait_for_connectable(dc.guard_port(), &namespace, &actor_id).await;
5764

58-
let response = common::ping_actor_via_guard(dc, &actor_id).await;
65+
let response = ping_actor_via_gateway(dc.guard_port(), &actor_id).await;
5966
assert_eq!(response["status"], "ok");
6067

6168
let mut disconnect = envoy.wait_for_next_connection_event(
@@ -71,8 +78,9 @@ fn envoy_reconnects_after_server_side_tcp_reset() {
7178
// The ping task writes every few seconds in the test config.
7279
disconnect.wait().await;
7380

74-
let reconnect = envoy
75-
.wait_for_next_connection_event(common::test_envoy::EnvoyConnectionEvent::Connected);
81+
let reconnect = envoy.wait_for_next_connection_event(
82+
common::test_envoy::EnvoyConnectionEvent::Connected,
83+
);
7684
envoy_proxy
7785
.clear_toxics()
7886
.await
@@ -90,6 +98,108 @@ fn envoy_reconnects_after_server_side_tcp_reset() {
9098
);
9199
}
92100

101+
#[test]
102+
fn engine_closes_envoy_ws_after_ping_timeout_while_envoy_remains_unaware() {
103+
common::run(
104+
common::TestOpts::new(1).with_timeout(120),
105+
|ctx| async move {
106+
let dc = ctx.leader_dc();
107+
let (namespace, _) = common::setup_test_namespace(dc).await;
108+
109+
// Stand up our own forwarder so we can simulate a true network partition.
110+
// Toxiproxy can stall traffic but always relays a peer's TCP close to the other
111+
// side, which would let envoy-client notice the engine has hung up.
112+
let freeze_proxy = common::freeze_proxy::FreezeProxy::start(
113+
std::net::SocketAddr::from(([127, 0, 0, 1], dc.guard_port())),
114+
)
115+
.await
116+
.expect("failed to start freeze proxy");
117+
118+
let envoy = common::setup_envoy(dc, &namespace, |builder| {
119+
builder
120+
.with_endpoint(freeze_proxy.endpoint())
121+
.with_actor_behavior("network-fault-actor", |_| {
122+
Box::new(
123+
common::test_envoy::CustomActorBuilder::new()
124+
.on_start(|_| {
125+
Box::pin(async {
126+
Ok(common::test_envoy::ActorStartResult::Running)
127+
})
128+
})
129+
.build(),
130+
)
131+
})
132+
})
133+
.await;
134+
135+
let res = common::create_actor(
136+
dc.guard_port(),
137+
&namespace,
138+
"network-fault-actor",
139+
envoy.pool_name(),
140+
rivet_types::actors::CrashPolicy::Sleep,
141+
)
142+
.await;
143+
let actor_id = res.actor.actor_id.to_string();
144+
wait_for_envoy_actor(&envoy, &actor_id).await;
145+
wait_for_connectable(dc.guard_port(), &namespace, &actor_id).await;
146+
147+
let response = ping_actor_via_gateway(dc.guard_port(), &actor_id).await;
148+
assert_eq!(response["status"], "ok");
149+
150+
// Wait past the local health threshold before injecting the fault. A healthy
151+
// connection must stay healthy because engine pings refresh envoy-client's
152+
// receive timestamp.
153+
tokio::time::sleep(ENVOY_PING_HEALTHY_THRESHOLD + ENVOY_PING_INTERVAL_MARGIN).await;
154+
let healthy = envoy
155+
.is_ping_healthy()
156+
.await
157+
.expect("envoy handle should exist");
158+
assert!(
159+
healthy,
160+
"envoy-client should remain healthy while engine pings are arriving"
161+
);
162+
163+
// Subscribe before injecting the fault so we can assert no event slips through.
164+
let mut disconnect = envoy.wait_for_next_connection_event(
165+
common::test_envoy::EnvoyConnectionEvent::Disconnected,
166+
);
167+
disconnect.assert_no_event();
168+
169+
// Black-hole the link in both directions. Bytes are read from both peers and
170+
// discarded, and EOFs are swallowed so neither peer's TCP stack ever sees a FIN.
171+
// The engine still keeps sending pings every few seconds (default 3s) but no pongs
172+
// come back, so its application-level ping timeout (default 15s) will eventually
173+
// fire and close the WebSocket. The envoy-client has no application-level
174+
// liveness check of its own, so as long as its TCP socket stays open it continues
175+
// to believe the connection is healthy.
176+
freeze_proxy.freeze();
177+
178+
// Wait well past the engine's 15s ping timeout and the envoy-client's local ping
179+
// health threshold.
180+
tokio::time::sleep(ENVOY_PING_HEALTHY_THRESHOLD + ENVOY_PING_INTERVAL_MARGIN).await;
181+
182+
// The envoy-client is still oblivious. The engine's close frame and TCP FIN never
183+
// reach it because the freeze proxy is holding the link open from envoy-client's
184+
// perspective.
185+
disconnect.assert_no_event();
186+
187+
// Even though the envoy-client thinks the WebSocket is alive, its own ping-tracker
188+
// must report unhealthy because no engine ping arrived in the last 20s. This is
189+
// the signal the rivetkit `/health` endpoint uses to ask its host to recycle the
190+
// container.
191+
let healthy = envoy
192+
.is_ping_healthy()
193+
.await
194+
.expect("envoy handle should exist");
195+
assert!(
196+
!healthy,
197+
"envoy-client should report unhealthy after 20s without an engine ping"
198+
);
199+
},
200+
);
201+
}
202+
93203
async fn wait_for_envoy_actor(envoy: &common::test_envoy::TestEnvoy, actor_id: &str) {
94204
common::wait_with_poll(
95205
std::time::Duration::from_secs(5),
@@ -138,7 +248,9 @@ async fn ping_actor_via_gateway(guard_port: u16, actor_id: &str) -> serde_json::
138248
.expect("failed to build reqwest client");
139249

140250
let response = client
141-
.get(format!("http://127.0.0.1:{guard_port}/gateway/{actor_id}/ping"))
251+
.get(format!(
252+
"http://127.0.0.1:{guard_port}/gateway/{actor_id}/ping"
253+
))
142254
.send()
143255
.await
144256
.expect("failed to ping actor through gateway");

0 commit comments

Comments
 (0)