Skip to content

Commit 1109409

Browse files
committed
test(engine): cover envoy Sleep crash policy wake and recovery paths
1 parent ee84fb7 commit 1109409

7 files changed

Lines changed: 286 additions & 25 deletions

File tree

engine/packages/engine/tests/envoy/actors_alarm.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -867,6 +867,8 @@ fn alarm_in_the_past() {
867867
})
868868
.await;
869869

870+
let lifecycle_rx = runner.subscribe_lifecycle_events();
871+
870872
let res = common::create_actor(
871873
ctx.leader_dc().guard_port(),
872874
&namespace,
@@ -881,21 +883,15 @@ fn alarm_in_the_past() {
881883
// Wait for actor to be ready (gen 2)
882884
ready_rx.await.expect("actor should send ready signal");
883885

884-
// Actor sets alarm in the past and sleeps
885-
wait_for_actor_sleep(ctx.leader_dc().guard_port(), &actor_id, &namespace, 5)
886-
.await
887-
.expect("actor should be asleep");
888-
889-
// The past alarm should fire immediately, waking the actor
890-
wait_for_actor_wake_polling(ctx.leader_dc().guard_port(), &actor_id, &namespace, 2)
886+
// A past alarm can wake the actor before the API observes sleep_ts.
887+
wait_for_actor_wake_from_alarm(lifecycle_rx, &actor_id, 2, 5)
891888
.await
892889
.expect("actor should wake immediately from past alarm");
893890

894-
// Verify actor is awake at gen 2
895-
let actor = common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id, &namespace)
891+
// Verify the API view has caught up and the actor is awake at gen 2.
892+
let actor = wait_for_actor_wake_polling(ctx.leader_dc().guard_port(), &actor_id, &namespace, 2)
896893
.await
897-
.expect("failed to get actor")
898-
.expect("actor should exist");
894+
.expect("actor should be awake");
899895

900896
assert!(actor.sleep_ts.is_none(), "actor should be awake");
901897
assert!(

engine/packages/engine/tests/envoy/actors_lifecycle.rs

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -987,6 +987,229 @@ fn envoy_crash_policy_sleep() {
987987
});
988988
}
989989

990+
// Fires the request in the background so a wake-then-crash race cannot panic
991+
// the test via `ping_actor_via_guard`'s non-200 assertion. The wake itself is
992+
// what we're verifying, and we observe it through `connectable_ts` polling.
993+
// Errors and non-2xx responses are logged so a wake that never reaches the
994+
// guard surfaces as a diagnostic instead of a silent 30s timeout.
995+
fn spawn_wake_ping(ctx: &common::TestCtx, actor_id: String) {
996+
let guard_port = ctx.leader_dc().guard_port();
997+
tokio::spawn(async move {
998+
let client = reqwest::Client::new();
999+
match client
1000+
.get(format!("http://127.0.0.1:{}/ping", guard_port))
1001+
.header("X-Rivet-Target", "actor")
1002+
.header("X-Rivet-Actor", &actor_id)
1003+
.send()
1004+
.await
1005+
{
1006+
Ok(res) if !res.status().is_success() => {
1007+
tracing::warn!(
1008+
%actor_id,
1009+
status = %res.status(),
1010+
"wake ping returned non-success"
1011+
);
1012+
}
1013+
Err(err) => {
1014+
tracing::warn!(%actor_id, ?err, "wake ping failed");
1015+
}
1016+
_ => {}
1017+
}
1018+
});
1019+
}
1020+
1021+
#[test]
1022+
fn envoy_crash_policy_sleep_wakes_on_request() {
1023+
common::run(common::TestOpts::new(1).with_timeout(75), |ctx| async move {
1024+
let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await;
1025+
1026+
let crash_count = Arc::new(Mutex::new(0));
1027+
1028+
// Actor crashes on first start, then succeeds. Sleep policy puts it to
1029+
// sleep after the crash; an incoming request should reallocate and run
1030+
// the now-successful start path.
1031+
let envoy = common::setup_envoy(ctx.leader_dc(), &namespace, |builder| {
1032+
builder.with_actor_behavior("crash-then-succeed-actor", move |_| {
1033+
Box::new(common::test_envoy::CrashNTimesThenSucceedActor::new(
1034+
1,
1035+
crash_count.clone(),
1036+
))
1037+
})
1038+
})
1039+
.await;
1040+
1041+
let res = common::create_actor(
1042+
ctx.leader_dc().guard_port(),
1043+
&namespace,
1044+
"crash-then-succeed-actor",
1045+
envoy.pool_name(),
1046+
rivet_types::actors::CrashPolicy::Sleep,
1047+
)
1048+
.await;
1049+
1050+
let actor_id_str = res.actor.actor_id.to_string();
1051+
1052+
// Wait for crash-induced sleep.
1053+
loop {
1054+
let actor =
1055+
common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id_str, &namespace)
1056+
.await
1057+
.expect("failed to get actor")
1058+
.expect("actor should exist");
1059+
if actor.sleep_ts.is_some() && actor.connectable_ts.is_none() {
1060+
break;
1061+
}
1062+
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1063+
}
1064+
1065+
tracing::info!(?actor_id_str, "actor sleeping after crash, sending request");
1066+
1067+
spawn_wake_ping(&ctx, actor_id_str.clone());
1068+
1069+
// Verify the actor wakes and becomes connectable on the second start.
1070+
let start = std::time::Instant::now();
1071+
let actor = loop {
1072+
let actor =
1073+
common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id_str, &namespace)
1074+
.await
1075+
.expect("failed to get actor")
1076+
.expect("actor should exist");
1077+
if actor.connectable_ts.is_some() {
1078+
break actor;
1079+
}
1080+
if start.elapsed() > std::time::Duration::from_secs(30) {
1081+
panic!(
1082+
"actor should wake from sleep on incoming request; last actor: {:?}",
1083+
actor
1084+
);
1085+
}
1086+
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1087+
};
1088+
1089+
assert!(
1090+
actor.connectable_ts.is_some(),
1091+
"actor should be connectable after waking from sleep"
1092+
);
1093+
1094+
tracing::info!(?actor_id_str, "actor woke from crash-induced sleep");
1095+
});
1096+
}
1097+
1098+
#[test]
1099+
fn envoy_crash_policy_sleep_recovers_after_wake_crash() {
1100+
common::run(common::TestOpts::new(1).with_timeout(90), |ctx| async move {
1101+
let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await;
1102+
1103+
let crash_count = Arc::new(Mutex::new(0));
1104+
let actor_crash_count = crash_count.clone();
1105+
1106+
// Actor crashes on the first two starts then succeeds. With Sleep
1107+
// policy, the first crash sleeps the actor; the wake (via incoming
1108+
// request) lets it crash again and re-attempt. This verifies the
1109+
// recovery path tolerates additional crashes after wake without
1110+
// abandoning the actor — eventual success is what the test asserts on,
1111+
// not strict per-cycle sleep observability (the workflow may quickly
1112+
// retry inside the wake path before falling back to another sleep).
1113+
let envoy = common::setup_envoy(ctx.leader_dc(), &namespace, |builder| {
1114+
builder.with_actor_behavior("crash-twice-actor", move |_| {
1115+
Box::new(common::test_envoy::CrashNTimesThenSucceedActor::new(
1116+
2,
1117+
actor_crash_count.clone(),
1118+
))
1119+
})
1120+
})
1121+
.await;
1122+
1123+
let res = common::create_actor(
1124+
ctx.leader_dc().guard_port(),
1125+
&namespace,
1126+
"crash-twice-actor",
1127+
envoy.pool_name(),
1128+
rivet_types::actors::CrashPolicy::Sleep,
1129+
)
1130+
.await;
1131+
1132+
let actor_id_str = res.actor.actor_id.to_string();
1133+
1134+
// Wait for crash-induced sleep after the first crash.
1135+
loop {
1136+
let actor =
1137+
common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id_str, &namespace)
1138+
.await
1139+
.expect("failed to get actor")
1140+
.expect("actor should exist");
1141+
if actor.sleep_ts.is_some() && actor.connectable_ts.is_none() {
1142+
break;
1143+
}
1144+
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1145+
}
1146+
1147+
tracing::info!(?actor_id_str, "first sleep observed, waking");
1148+
spawn_wake_ping(&ctx, actor_id_str.clone());
1149+
1150+
// Verify the actor eventually reaches a connectable, running state
1151+
// after at least two crashes (gen 1 then gen 2 on wake).
1152+
let start = std::time::Instant::now();
1153+
let actor = loop {
1154+
let actor =
1155+
common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id_str, &namespace)
1156+
.await
1157+
.expect("failed to get actor")
1158+
.expect("actor should exist");
1159+
if actor.connectable_ts.is_some() {
1160+
break actor;
1161+
}
1162+
if start.elapsed() > std::time::Duration::from_secs(30) {
1163+
panic!(
1164+
"actor should eventually run after wake recovery; last actor: {:?}",
1165+
actor
1166+
);
1167+
}
1168+
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1169+
};
1170+
1171+
assert!(actor.connectable_ts.is_some());
1172+
assert_eq!(
1173+
*crash_count.lock().expect("crash count lock"),
1174+
2,
1175+
"actor should have crashed exactly twice before the successful start"
1176+
);
1177+
1178+
tracing::info!(?actor_id_str, "sleep + wake-with-crash recovery completed");
1179+
});
1180+
}
1181+
1182+
#[test]
1183+
fn envoy_crash_policy_sleep_persists_on_creation() {
1184+
common::run(common::TestOpts::new(1), |ctx| async move {
1185+
let (namespace, _, _runner) =
1186+
common::setup_test_namespace_with_envoy(ctx.leader_dc()).await;
1187+
1188+
let res = common::api::public::actors_create(
1189+
ctx.leader_dc().guard_port(),
1190+
common::api_types::actors::create::CreateQuery {
1191+
namespace: namespace.clone(),
1192+
},
1193+
common::api_types::actors::create::CreateRequest {
1194+
datacenter: None,
1195+
name: "test-actor".to_string(),
1196+
key: None,
1197+
input: None,
1198+
runner_name_selector: common::TEST_RUNNER_NAME.to_string(),
1199+
crash_policy: rivet_types::actors::CrashPolicy::Sleep,
1200+
},
1201+
)
1202+
.await
1203+
.expect("failed to create actor");
1204+
1205+
let actor_id = res.actor.actor_id.to_string();
1206+
1207+
let actor =
1208+
common::assert_actor_exists(ctx.leader_dc().guard_port(), &actor_id, &namespace).await;
1209+
assert_eq!(actor.crash_policy, rivet_types::actors::CrashPolicy::Sleep);
1210+
});
1211+
}
1212+
9901213
#[ignore = "non-sleep crash policies are not yet supported for envoys"]
9911214
#[test]
9921215
fn envoy_crash_policy_destroy() {

engine/packages/pegboard/src/ops/actor/create.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result<
6363
name: input.name.clone(),
6464
pool_name: input.runner_name_selector.clone(),
6565
key: input.key.clone(),
66+
crash_policy: input.crash_policy,
6667
namespace_id: input.namespace_id,
6768
input: input.input.clone(),
6869
from_v1: false,

engine/packages/pegboard/src/ops/actor/util.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use gas::db::WorkflowData;
22
use gas::prelude::*;
3-
use rivet_types::actors::{Actor, CrashPolicy};
3+
use rivet_types::actors::Actor;
44
use std::collections::{HashMap, HashSet};
55

66
use crate::workflows::actor::FailureReason as WorkflowFailureReason;
@@ -212,7 +212,7 @@ pub async fn build_actors_from_workflows(
212212
namespace_id: s.namespace_id,
213213
datacenter: dc_name.to_string(),
214214
runner_name_selector: s.pool_name,
215-
crash_policy: CrashPolicy::Sleep,
215+
crash_policy: s.crash_policy,
216216

217217
create_ts: s.create_ts,
218218
start_ts: s.start_ts,

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
230230
name: input.name.clone(),
231231
pool_name: input.runner_name_selector.clone(),
232232
key: input.key.clone(),
233+
crash_policy: input.crash_policy,
233234
namespace_id: input.namespace_id,
234235
input: input.input.clone(),
235236
from_v1: true,
@@ -867,6 +868,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
867868
name: input.name.clone(),
868869
pool_name: input.runner_name_selector.clone(),
869870
key: input.key.clone(),
871+
crash_policy: input.crash_policy,
870872
namespace_id: input.namespace_id,
871873
input: input.input.clone(),
872874
from_v1: true,

0 commit comments

Comments
 (0)