Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 255 additions & 20 deletions Cargo.lock

Large diffs are not rendered by default.

27 changes: 26 additions & 1 deletion engine/packages/engine/tests/common/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
pub timeout_secs: u64,
pub pegboard_outbound: bool,
pub auth_admin_token: Option<String>,
pub network_faults: bool,
}

impl TestOpts {
Expand All @@ -73,6 +74,7 @@
timeout_secs: 10,
pegboard_outbound: false,
auth_admin_token: None,
network_faults: false,
}
}

Expand All @@ -90,6 +92,11 @@
self.auth_admin_token = Some(token.into());
self
}

pub fn with_network_faults(mut self) -> Self {
self.network_faults = true;
self
}
}

impl Default for TestOpts {
Expand All @@ -99,13 +106,15 @@
timeout_secs: 10,
pegboard_outbound: false,
auth_admin_token: None,
network_faults: false,
}
}
}

pub struct TestCtx {
dcs: Vec<TestDatacenter>,
pub opts: TestOpts,
network_faults: Option<rivet_test_deps::ToxiproxyTestServer>,
}

pub struct TestDatacenter {
Expand Down Expand Up @@ -151,13 +160,23 @@
Self::setup_instance(
test_deps,
opts.pegboard_outbound,
opts.auth_admin_token.clone(),

Check warning on line 163 in engine/packages/engine/tests/common/ctx.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/engine/tests/common/ctx.rs

Check warning on line 163 in engine/packages/engine/tests/common/ctx.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/engine/tests/common/ctx.rs

Check warning on line 163 in engine/packages/engine/tests/common/ctx.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/engine/tests/common/ctx.rs
)
});
let mut dcs: Vec<TestDatacenter> = futures_util::future::try_join_all(setup_futures).await?;
dcs.sort_by_key(|dc| dc.config.dc_label());

Ok(Self { dcs, opts })
let network_faults = if opts.network_faults {
Some(rivet_test_deps::ToxiproxyTestServer::start().await?)
} else {
None
};

Ok(Self {
dcs,
opts,
network_faults,
})
}

async fn setup_instance(
Expand Down Expand Up @@ -262,6 +281,12 @@
.unwrap_or_else(|| panic!("No datacenter found with label {}", label))
}

pub fn network_faults(&self) -> &rivet_test_deps::ToxiproxyTestServer {
self.network_faults
.as_ref()
.expect("Network faults were not enabled. Use TestOpts::with_network_faults().")
}

pub async fn shutdown(self) {
tracing::info!("shutting down multi-DC test context");
for dc in self.dcs {
Expand Down
80 changes: 79 additions & 1 deletion engine/packages/engine/tests/common/test_envoy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,46 @@

pub type TestEnvoy = Envoy;

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EnvoyConnectionEvent {
Connected,
Disconnected,
}

pub struct EnvoyConnectionEventWaiter {
rx: broadcast::Receiver<EnvoyConnectionEvent>,
expected: EnvoyConnectionEvent,
timeout: std::time::Duration,
}

impl EnvoyConnectionEventWaiter {
pub fn assert_no_event(&mut self) {
match self.rx.try_recv() {
Err(tokio::sync::broadcast::error::TryRecvError::Empty) => {}
Ok(event) => panic!("unexpected Envoy connection event before fault: {event:?}"),
Err(tokio::sync::broadcast::error::TryRecvError::Lagged(count)) => {
panic!("missed {count} Envoy connection events before fault")
}
Err(err) => panic!("Envoy connection event channel closed: {err}"),
}
}

pub async fn wait(mut self) {
tokio::time::timeout(self.timeout, async {
loop {
match self.rx.recv().await {
Ok(event) if event == self.expected => break,
Ok(_) => {}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(err) => panic!("Envoy connection event channel closed: {err}"),
}
}
})
.await
.expect("timed out waiting for Envoy connection event");
}
}

#[derive(Clone)]
pub struct EnvoyConfig {
endpoint: String,
Expand Down Expand Up @@ -119,6 +159,7 @@
actor_factories: self.actor_factories,
actors: tokio::sync::Mutex::new(HashMap::new()),
lifecycle_tx,
connection_tx: broadcast::channel(100).0,
}),
handle: tokio::sync::Mutex::new(None),
envoy_key: uuid::Uuid::new_v4().to_string(),
Expand All @@ -130,6 +171,7 @@
actor_factories: HashMap<String, ActorFactory>,
actors: tokio::sync::Mutex<HashMap<String, Box<dyn TestActor>>>,
lifecycle_tx: broadcast::Sender<ActorLifecycleEvent>,
connection_tx: broadcast::Sender<EnvoyConnectionEvent>,
}

pub struct Envoy {
Expand Down Expand Up @@ -197,6 +239,21 @@
self.inner.lifecycle_tx.subscribe()
}

pub fn subscribe_connection_events(&self) -> broadcast::Receiver<EnvoyConnectionEvent> {
self.inner.connection_tx.subscribe()
}

pub fn wait_for_next_connection_event(
&self,
expected: EnvoyConnectionEvent,
) -> EnvoyConnectionEventWaiter {
EnvoyConnectionEventWaiter {
rx: self.subscribe_connection_events(),
expected,
timeout: std::time::Duration::from_secs(20),
}
}

pub async fn shutdown(&self) {
if let Some(handle) = self.handle.lock().await.take() {
handle.shutdown_and_wait(false).await;
Expand Down Expand Up @@ -239,8 +296,19 @@
}
}
}

Check warning on line 299 in engine/packages/engine/tests/common/test_envoy.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/engine/tests/common/test_envoy.rs
impl rivet_test_envoy::EnvoyCallbacks for TestEnvoyCallbacks {
fn on_connect(&self, _handle: EnvoyHandle) {
let _ = self.inner.connection_tx.send(EnvoyConnectionEvent::Connected);
}

fn on_disconnect(&self, _handle: EnvoyHandle) {
let _ = self
.inner
.connection_tx
.send(EnvoyConnectionEvent::Disconnected);
}

fn on_actor_start(
&self,
handle: EnvoyHandle,
Expand Down Expand Up @@ -406,7 +474,7 @@
_request: &HttpRequest,
) -> BoxFuture<Result<bool>> {
Box::pin(async { Ok(false) })
}

Check warning on line 477 in engine/packages/engine/tests/common/test_envoy.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/engine/tests/common/test_envoy.rs
}

fn spawn_event_bridge(
Expand All @@ -416,7 +484,7 @@
tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
match event.event {
rivet_runner_protocol::mk2::Event::EventActorIntent(intent) => match intent.intent {

Check warning on line 487 in engine/packages/engine/tests/common/test_envoy.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/engine/tests/common/test_envoy.rs
rivet_runner_protocol::mk2::ActorIntent::ActorIntentSleep => {
handle.sleep_actor(event.actor_id, Some(event.generation));
}
Expand Down Expand Up @@ -491,7 +559,7 @@
metadata: Vec::new(),
},
)
})

Check warning on line 562 in engine/packages/engine/tests/common/test_envoy.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/engine/tests/common/test_envoy.rs
}
rivet_runner_protocol::mk2::KvRequestData::KvPutRequest(body) => handle
.kv_put(req.actor_id, body.keys.into_iter().zip(body.values).collect())
Expand Down Expand Up @@ -526,6 +594,7 @@
namespace: String,
pool_name: String,
version: u32,
endpoint: Option<String>,
actor_factories: HashMap<String, ActorFactory>,
}

Expand All @@ -535,6 +604,7 @@
namespace: namespace.to_string(),
pool_name: "test-envoy".to_string(),
version: 1,
endpoint: None,
actor_factories: HashMap::new(),
}
}
Expand All @@ -549,6 +619,11 @@
self
}

pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = Some(endpoint.into());
self
}

pub fn with_actor_behavior<F>(mut self, actor_name: &str, factory: F) -> Self
where
F: Fn(ActorConfig) -> Box<dyn TestActor> + Send + Sync + 'static,
Expand All @@ -560,7 +635,10 @@

pub async fn build(self, dc: &super::TestDatacenter) -> Result<Envoy> {
let config = EnvoyConfig::builder()
.endpoint(format!("http://127.0.0.1:{}", dc.guard_port()))
.endpoint(
self.endpoint
.unwrap_or_else(|| format!("http://127.0.0.1:{}", dc.guard_port())),
)
.token("dev")
.namespace(&self.namespace)
.pool_name(&self.pool_name)
Expand Down
24 changes: 23 additions & 1 deletion engine/packages/engine/tests/common/test_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
use std::{collections::HashMap, str::FromStr};
use std::{collections::HashMap, future::Future, str::FromStr};

use serde_json::json;

use super::TestDatacenter;

pub async fn wait_with_poll<T, F, Fut>(
timeout: std::time::Duration,
poll_interval: std::time::Duration,
mut check: F,
) -> Option<T>
where
F: FnMut() -> Fut,
Fut: Future<Output = Option<T>>,
{
tokio::time::timeout(timeout, async {
loop {
if let Some(value) = check().await {
break value;
}

tokio::time::sleep(poll_interval).await;
}
})
.await
.ok()
}

// Namespace helpers
pub async fn setup_test_namespace(leader_dc: &TestDatacenter) -> (String, rivet_util::Id) {
let random_suffix = rand::random::<u16>();
Expand Down
1 change: 1 addition & 0 deletions engine/packages/engine/tests/envoy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod auth;
pub mod network_faults;
pub mod actors_alarm;
pub mod actors_kv_crud;
pub mod actors_kv_delete_range;
Expand Down
Loading
Loading