Skip to content

Commit e054f95

Browse files
committed
fix(rivetkit-core): allow dispatch during sleep grace
1 parent ae09be0 commit e054f95

4 files changed

Lines changed: 45 additions & 13 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,16 +1164,10 @@ impl ActorTask {
11641164
self.ctx.warn_work_sent_to_stopping_instance("dispatch");
11651165
return Some(ActorLifecycleError::Destroying.build());
11661166
}
1167-
if self.ctx.sleep_requested() {
1168-
self.ctx.warn_work_sent_to_stopping_instance("dispatch");
1169-
return Some(ActorLifecycleError::Stopping.build());
1170-
}
11711167

11721168
match self.lifecycle {
1173-
LifecycleState::Started => None,
1174-
LifecycleState::SleepGrace
1175-
| LifecycleState::SleepFinalize
1176-
| LifecycleState::DestroyGrace => {
1169+
LifecycleState::Started | LifecycleState::SleepGrace => None,
1170+
LifecycleState::SleepFinalize | LifecycleState::DestroyGrace => {
11771171
self.ctx.warn_work_sent_to_stopping_instance("dispatch");
11781172
Some(ActorLifecycleError::Stopping.build())
11791173
}

rivetkit-rust/packages/rivetkit-core/tests/context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ mod moved_tests {
312312
envoy_key: "test-envoy".to_string(),
313313
envoy_tx,
314314
actors: Arc::new(std::sync::Mutex::new(HashMap::new())),
315+
actors_notify: Arc::new(tokio::sync::Notify::new()),
315316
live_tunnel_requests,
316317
pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::from([(
317318
actor_id.to_owned(),
@@ -360,6 +361,7 @@ mod moved_tests {
360361
envoy_key: "test-envoy".to_string(),
361362
envoy_tx,
362363
actors: Arc::new(std::sync::Mutex::new(HashMap::new())),
364+
actors_notify: Arc::new(tokio::sync::Notify::new()),
363365
live_tunnel_requests: Arc::new(std::sync::Mutex::new(HashMap::new())),
364366
pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::new())),
365367
ws_tx: Arc::new(tokio::sync::Mutex::new(

rivetkit-rust/packages/rivetkit-core/tests/schedule.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ mod moved_tests {
8888
envoy_key: "test-envoy".to_string(),
8989
envoy_tx,
9090
actors: Arc::new(EnvoySharedMutex::new(HashMap::new())),
91+
actors_notify: Arc::new(tokio::sync::Notify::new()),
9192
live_tunnel_requests: Arc::new(EnvoySharedMutex::new(HashMap::new())),
9293
pending_hibernation_restores: Arc::new(EnvoySharedMutex::new(HashMap::new())),
9394
ws_tx: Arc::new(tokio::sync::Mutex::new(

rivetkit-rust/packages/rivetkit-core/tests/task.rs

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ mod moved_tests {
242242
envoy_key: "test-envoy".to_string(),
243243
envoy_tx,
244244
actors: Arc::new(Mutex::new(HashMap::new())),
245+
actors_notify: Arc::new(tokio::sync::Notify::new()),
245246
live_tunnel_requests: Arc::new(Mutex::new(HashMap::new())),
246247
pending_hibernation_restores: Arc::new(Mutex::new(HashMap::new())),
247248
ws_tx: Arc::new(tokio::sync::Mutex::new(
@@ -2812,11 +2813,14 @@ mod moved_tests {
28122813
})
28132814
.await
28142815
.expect("action should send during sleep grace");
2815-
let _error = action_rx
2816-
.await
2817-
.expect("action reply should send")
2818-
.expect_err("sleep grace should reject new dispatch");
2819-
assert_eq!(action_count.load(Ordering::SeqCst), 0);
2816+
assert_eq!(
2817+
action_rx
2818+
.await
2819+
.expect("action reply should send")
2820+
.expect("sleep grace should accept new dispatch"),
2821+
vec![7, 7, 7]
2822+
);
2823+
assert_eq!(action_count.load(Ordering::SeqCst), 1);
28202824
assert_eq!(destroy_count.load(Ordering::SeqCst), 0);
28212825

28222826
release_tx.send(()).expect("keep-awake release should send");
@@ -2833,6 +2837,37 @@ mod moved_tests {
28332837
.expect("task run should succeed");
28342838
}
28352839

2840+
#[tokio::test]
2841+
async fn sleep_finalize_rejects_new_dispatch() {
2842+
let ctx = new_with_kv(
2843+
"actor-sleep-finalize-dispatch",
2844+
"task-sleep-finalize-dispatch",
2845+
Vec::new(),
2846+
"local",
2847+
new_in_memory(),
2848+
);
2849+
let mut task = new_task(ctx);
2850+
task.lifecycle = LifecycleState::SleepFinalize;
2851+
2852+
let (reply_tx, reply_rx) = oneshot::channel();
2853+
task.handle_dispatch(DispatchCommand::Action {
2854+
name: "ping".to_owned(),
2855+
args: Vec::new(),
2856+
conn: ConnHandle::new("conn-finalize", Vec::new(), Vec::new(), false),
2857+
reply: reply_tx,
2858+
})
2859+
.await;
2860+
2861+
let error = reply_rx
2862+
.await
2863+
.expect("action reply should send")
2864+
.expect_err("sleep finalize should reject new dispatch");
2865+
assert!(
2866+
error.to_string().contains("Actor is stopping"),
2867+
"expected actor stopping error, got {error:#}"
2868+
);
2869+
}
2870+
28362871
#[cfg(not(debug_assertions))]
28372872
#[tokio::test]
28382873
async fn duplicate_destroy_during_sleep_grace_is_acked_and_ignored_in_release() {

0 commit comments

Comments
 (0)