Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 13 additions & 160 deletions engine/packages/engine/tests/envoy/actors_alarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,69 +516,6 @@ impl Actor for AlarmOnceActor {
}
}

/// Actor that sets an alarm, sleeps on gen 2, then crashes immediately on wake.
/// Gen 1+ stays running. Used to test that alarms don't persist across generations.
struct AlarmSleepThenCrashActor {
alarm_offset_ms: i64,
sleeping_tx: tokio::sync::mpsc::UnboundedSender<u32>,
crash_tx: tokio::sync::mpsc::UnboundedSender<u32>,
}

impl AlarmSleepThenCrashActor {
fn new(
alarm_offset_ms: i64,
sleeping_tx: tokio::sync::mpsc::UnboundedSender<u32>,
crash_tx: tokio::sync::mpsc::UnboundedSender<u32>,
) -> Self {
Self {
alarm_offset_ms,
sleeping_tx,
crash_tx,
}
}
}

#[async_trait]
impl Actor for AlarmSleepThenCrashActor {
async fn on_start(&mut self, config: ActorConfig) -> anyhow::Result<ActorStartResult> {
let generation = config.generation;
tracing::info!(?config.actor_id, generation, "alarm crash actor starting");

if generation == 1 {
// First start (gen 2): set alarm, and crash
let alarm_time = get_current_timestamp_ms() + self.alarm_offset_ms;
config.send_set_alarm(alarm_time);

// Notify test
let _ = self.crash_tx.send(generation);

tracing::info!(generation, "set alarm and sleeping");
Ok(ActorStartResult::Crash {
code: 1,
message: "crashing with gen 2".to_string(),
})
} else if generation == 2 {
tracing::info!(generation, "restarted after crash, sending sleep intent");
config.send_sleep_intent();
let _ = self.sleeping_tx.send(generation);
Ok(ActorStartResult::Running)
} else {
// If it restarted again, this was not expected
//
// Keep the actor running so the test finds out we're not asleep.
Ok(ActorStartResult::Running)
}
}

async fn on_stop(&mut self) -> anyhow::Result<ActorStopResult> {
Ok(ActorStopResult::Success)
}

fn name(&self) -> &str {
"AlarmSleepThenCrashActor"
}
}

/// Actor that rapidly sets and clears alarms multiple times before sleeping (generation 2 only).
/// Used to test that rapid operations don't cause errors.
struct RapidAlarmCycleActor {
Expand Down Expand Up @@ -707,7 +644,7 @@ fn basic_alarm() {
&namespace,
"alarm-actor",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -756,7 +693,7 @@ fn clear_alarm_prevents_wake() {
&namespace,
"alarm-actor",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -813,7 +750,7 @@ fn replace_alarm_overwrites_previous() {
&namespace,
"alarm-actor",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -872,7 +809,7 @@ fn alarm_in_the_past() {
&namespace,
"alarm-actor",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -928,7 +865,7 @@ fn alarm_with_null_timestamp() {
&namespace,
"alarm-actor",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -965,7 +902,6 @@ fn alarm_with_null_timestamp() {
#[test]
// Broken legacy Pegboard Runner test: full engine sweep observed the 5s alarm
// firing after 6.07s, outside the ±500ms assertion window.
#[ignore = "broken legacy Pegboard Runner test: alarm timing drifts in full engine sweep"]
fn alarm_fires_at_correct_time() {
common::run(
common::TestOpts::new(1).with_timeout(10),
Expand All @@ -988,7 +924,7 @@ fn alarm_fires_at_correct_time() {
&namespace,
"alarm-actor",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -1053,7 +989,7 @@ fn multiple_alarm_sets_before_sleep() {
&namespace,
"alarm-actor",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -1092,7 +1028,6 @@ fn multiple_alarm_sets_before_sleep() {
#[test]
// Broken legacy Pegboard Runner test: full engine sweep timed out in
// `multiple_sleep_wake_alarm_cycles`.
#[ignore = "broken legacy Pegboard Runner test: times out in full engine sweep"]
fn multiple_sleep_wake_alarm_cycles() {
common::run(common::TestOpts::new(1), |ctx| async move {
let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await;
Expand All @@ -1113,7 +1048,7 @@ fn multiple_sleep_wake_alarm_cycles() {
&namespace,
"alarm-actor",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -1159,7 +1094,7 @@ fn alarm_wake_then_sleep_without_new_alarm() {
&namespace,
"alarm-actor",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -1201,86 +1136,6 @@ fn alarm_wake_then_sleep_without_new_alarm() {

// MARK: Advanced Usage

#[ignore = "non-sleep crash policies are not yet supported for envoys"]
#[test]
fn alarm_behavior_with_crash_policy_restart() {
common::run(
common::TestOpts::new(1).with_timeout(45),
|ctx| async move {
let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await;

let (sleeping_tx, mut sleeping_rx) = tokio::sync::mpsc::unbounded_channel();
let (crash_tx, mut crash_rx) = tokio::sync::mpsc::unbounded_channel();

let runner = common::setup_envoy(ctx.leader_dc(), &namespace, |builder| {
builder.with_actor_behavior("alarm-actor", move |_| {
let sleeping_tx = sleeping_tx.clone();
let crash_tx = crash_tx.clone();
// Set alarm for 15s, crash after 500ms
Box::new(AlarmSleepThenCrashActor::new(15000, sleeping_tx, crash_tx))
})
})
.await;

let res = common::create_actor(
ctx.leader_dc().guard_port(),
&namespace,
"alarm-actor",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Restart,
)
.await;

let actor_id = res.actor.actor_id.to_string();

// Wait for crash notification gen 2 sets alarm and crashes
crash_rx
.recv()
.await
.expect("should receive crash notification");

tracing::info!(
?actor_id,
"gen 2 crashed after alarm wake, waiting for gen 2 restart"
);

// Wait for actor to start sleeping again (gen 2 started and sleep)
sleeping_rx
.recv()
.await
.expect("actor should send sleep signal");

let actor =
wait_for_actor_sleep(ctx.leader_dc().guard_port(), &actor_id, &namespace, 5)
.await
.expect("actor should be sleeping");

assert!(actor.sleep_ts.is_some(), "actor should be asleep");

tracing::info!(
?actor_id,
"gen 2 is now asleep, waiting past original alarm time"
);

// Verify the next gen is awake (woke from gen 2's alarm). Use a small
// cushion over the 15s alarm offset for scheduling jitter.
let actor = wait_for_actor_wake_polling(
ctx.leader_dc().guard_port(),
&actor_id,
&namespace,
20,
)
.await
.expect("actor should wake from original alarm");

assert!(
actor.sleep_ts.is_none() && actor.connectable_ts.is_some(),
"next generation should be awake from gen 2 alarm"
);
},
);
}

#[test]
fn rapid_alarm_set_clear_cycles() {
common::run(common::TestOpts::new(1), |ctx| async move {
Expand All @@ -1303,7 +1158,7 @@ fn rapid_alarm_set_clear_cycles() {
&namespace,
"alarm-actor",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -1334,7 +1189,6 @@ fn rapid_alarm_set_clear_cycles() {
// Broken legacy Pegboard Runner coverage: passes alone but fails in the full
// engine sweep under Envoy+Runner load; the full sweep reports this test failed.
#[test]
#[ignore = "broken legacy Pegboard Runner test: fails only in full engine sweep"]
fn multiple_actors_with_different_alarm_times() {
common::run(common::TestOpts::new(1), |ctx| async move {
let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await;
Expand Down Expand Up @@ -1365,7 +1219,7 @@ fn multiple_actors_with_different_alarm_times() {
&namespace,
&format!("alarm-actor-{}", idx),
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;
actor_ids.push(res.actor.actor_id.to_string());
Expand Down Expand Up @@ -1399,7 +1253,6 @@ fn multiple_actors_with_different_alarm_times() {
#[test]
// Broken legacy Pegboard Runner test: times out waiting for all same-deadline
// actors to wake in the combined Envoy+Runner full engine sweep.
#[ignore = "broken legacy Pegboard Runner test: times out in full engine sweep"]
fn many_actors_same_alarm_time() {
common::run(common::TestOpts::new(1).with_timeout(45), |ctx| async move {
let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await;
Expand All @@ -1426,7 +1279,7 @@ fn many_actors_same_alarm_time() {
&namespace,
"alarm-actor",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;
actor_ids.push(res.actor.actor_id.to_string());
Expand Down Expand Up @@ -1542,7 +1395,7 @@ fn alarm_overdue_during_sleep_transition_fires_via_reallocation() {
&namespace,
"alarm-actor",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down
16 changes: 8 additions & 8 deletions engine/packages/engine/tests/envoy/actors_kv_crud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ fn basic_kv_put_and_get() {
&namespace,
"kv-put-get",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -485,7 +485,7 @@ fn kv_get_nonexistent_key() {
&namespace,
"kv-get-nonexistent",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -525,7 +525,7 @@ fn kv_put_overwrite_existing() {
&namespace,
"kv-overwrite",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -565,7 +565,7 @@ fn kv_delete_existing_key() {
&namespace,
"kv-delete",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -605,7 +605,7 @@ fn kv_delete_nonexistent_key() {
&namespace,
"kv-delete-nonexistent",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -898,7 +898,7 @@ fn kv_put_multiple_keys() {
&namespace,
"kv-batch-put",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -937,7 +937,7 @@ fn kv_get_multiple_keys() {
&namespace,
"kv-batch-get",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down Expand Up @@ -976,7 +976,7 @@ fn kv_delete_multiple_keys() {
&namespace,
"kv-batch-delete",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ fn kv_delete_range_removes_half_open_range() {
&namespace,
"kv-delete-range",
runner.pool_name(),
rivet_types::actors::CrashPolicy::Destroy,
rivet_types::actors::CrashPolicy::Sleep,
)
.await;

Expand Down
Loading
Loading