Skip to content

Commit 7be35f1

Browse files
committed
fix(rivetkit-core): error on sleep/destroy before startup or already-requested
1 parent cfc540f commit 7be35f1

7 files changed

Lines changed: 59 additions & 23 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering}
55
use std::sync::{Arc, OnceLock};
66
use std::time::{Duration, SystemTime, UNIX_EPOCH};
77

8-
use anyhow::Result;
8+
use anyhow::{Context as AnyhowContext, Result};
99
use futures::future::BoxFuture;
1010
use parking_lot::{Mutex, RwLock};
1111
use rivet_envoy_client::handle::EnvoyHandle;
@@ -36,7 +36,7 @@ use crate::actor::task::{
3636
};
3737
use crate::actor::task_types::UserTaskKind;
3838
use crate::actor::work_registry::RegionGuard;
39-
use crate::error::ActorRuntime;
39+
use crate::error::{ActorLifecycle as ActorLifecycleError, ActorRuntime};
4040
use crate::inspector::{Inspector, InspectorSnapshot};
4141
use crate::kv::Kv;
4242
use crate::sqlite::SqliteDb;
@@ -384,9 +384,16 @@ impl ActorContext {
384384
self
385385
}
386386

387-
pub fn sleep(&self) {
387+
pub fn sleep(&self) -> Result<()> {
388+
if !self.0.sleep.started.load(Ordering::SeqCst) {
389+
return Err(ActorLifecycleError::Starting.build())
390+
.context("cannot request sleep before actor startup completes");
391+
}
392+
if self.0.sleep_requested.swap(true, Ordering::SeqCst) {
393+
return Err(ActorLifecycleError::Stopping.build())
394+
.context("sleep already requested for this generation");
395+
}
388396
self.cancel_sleep_timer();
389-
self.0.sleep_requested.store(true, Ordering::SeqCst);
390397
if Handle::try_current().is_ok() {
391398
let ctx = self.clone();
392399
let tracked = self.track_shutdown_task(async move {
@@ -396,15 +403,27 @@ impl ActorContext {
396403
ctx.record_user_task_finished(UserTaskKind::SleepFinalize, started_at.elapsed());
397404
});
398405
if tracked {
399-
return;
406+
return Ok(());
400407
}
401408
}
402409

403410
self.request_sleep_from_envoy();
411+
Ok(())
404412
}
405413

406-
pub fn destroy(&self) {
407-
self.mark_destroy_requested();
414+
pub fn destroy(&self) -> Result<()> {
415+
if !self.0.sleep.started.load(Ordering::SeqCst) {
416+
return Err(ActorLifecycleError::Starting.build())
417+
.context("cannot request destroy before actor startup completes");
418+
}
419+
if self.0.destroy_requested.swap(true, Ordering::SeqCst) {
420+
return Err(ActorLifecycleError::Stopping.build())
421+
.context("destroy already requested for this generation");
422+
}
423+
self.cancel_sleep_timer();
424+
self.flush_on_shutdown();
425+
self.0.destroy_completed.store(false, Ordering::SeqCst);
426+
self.0.abort_signal.cancel();
408427

409428
let ctx = self.clone();
410429
if Handle::try_current().is_ok() {
@@ -415,11 +434,12 @@ impl ActorContext {
415434
ctx.record_user_task_finished(UserTaskKind::DestroyRequest, started_at.elapsed());
416435
});
417436
if tracked {
418-
return;
437+
return Ok(());
419438
}
420439
}
421440

422441
self.request_destroy_from_envoy();
442+
Ok(())
423443
}
424444

425445
pub fn mark_destroy_requested(&self) {

rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,13 @@ impl ActorContext {
312312
sleep_timeout_ms = timeout.as_millis() as u64,
313313
"sleep idle timer elapsed"
314314
);
315-
ctx.sleep();
315+
if let Err(err) = ctx.sleep() {
316+
tracing::debug!(
317+
actor_id = %ctx.actor_id(),
318+
?err,
319+
"sleep idle timer request suppressed"
320+
);
321+
}
316322
} else {
317323
tracing::warn!(
318324
actor_id = %ctx.actor_id(),
@@ -713,9 +719,11 @@ mod tests {
713719
#[tokio::test(start_paused = true)]
714720
async fn sleep_then_destroy_signal_tasks_do_not_leak_after_teardown() {
715721
let ctx = ActorContext::new_for_sleep_tests("actor-sleep-destroy");
722+
ctx.set_sleep_started(true);
716723

717-
ctx.sleep();
718-
ctx.destroy();
724+
ctx.sleep().expect("sleep should be accepted after startup");
725+
ctx.destroy()
726+
.expect("destroy should be accepted after startup");
719727

720728
assert_eq!(
721729
ctx.shutdown_task_count(),

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1952,7 +1952,13 @@ impl ActorTask {
19521952
sleep_timeout_ms = self.factory.config().sleep_timeout.as_millis() as u64,
19531953
"sleep idle deadline elapsed"
19541954
);
1955-
self.ctx.sleep();
1955+
if let Err(err) = self.ctx.sleep() {
1956+
tracing::debug!(
1957+
actor_id = %self.ctx.actor_id(),
1958+
?err,
1959+
"sleep idle deadline request suppressed"
1960+
);
1961+
}
19561962
} else {
19571963
tracing::warn!(
19581964
actor_id = %self.ctx.actor_id(),

rivetkit-rust/packages/rivetkit-core/tests/modules/context.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -814,9 +814,11 @@ mod moved_tests {
814814
crate::kv::Kv::new_in_memory(),
815815
);
816816

817+
ctx.set_sleep_started(true);
818+
817819
assert_eq!(ctx.sleep_request_count(), 0);
818820

819-
ctx.sleep();
821+
ctx.sleep().expect("sleep should be accepted after startup");
820822
tokio::task::yield_now().await;
821823

822824
assert_eq!(ctx.sleep_request_count(), 1);

rivetkit-rust/packages/rivetkit-core/tests/modules/task.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2986,7 +2986,7 @@ mod moved_tests {
29862986
}));
29872987
let factory = Arc::new(ActorFactory::new(ActorConfig::default(), move |start| {
29882988
Box::pin(async move {
2989-
start.ctx.sleep();
2989+
start.ctx.sleep()?;
29902990
Ok(())
29912991
})
29922992
}));
@@ -3053,7 +3053,7 @@ mod moved_tests {
30533053
}));
30543054
let factory = Arc::new(ActorFactory::new(ActorConfig::default(), move |start| {
30553055
Box::pin(async move {
3056-
start.ctx.destroy();
3056+
start.ctx.destroy()?;
30573057
Ok(())
30583058
})
30593059
}));

rivetkit-rust/packages/rivetkit/src/context.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,12 @@ impl<A: Actor> Ctx<A> {
8787
self.inner.save_state(deltas).await
8888
}
8989

90-
pub fn sleep(&self) {
91-
self.inner.sleep();
90+
pub fn sleep(&self) -> Result<()> {
91+
self.inner.sleep()
9292
}
9393

94-
pub fn destroy(&self) {
95-
self.inner.destroy();
94+
pub fn destroy(&self) -> Result<()> {
95+
self.inner.destroy()
9696
}
9797

9898
#[deprecated(note = "no-op: use `keep_awake` or `wait_until` instead")]

rivetkit-typescript/packages/rivetkit-napi/src/actor_context.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -431,13 +431,13 @@ impl ActorContext {
431431
}
432432

433433
#[napi]
434-
pub fn sleep(&self) {
435-
self.inner.sleep();
434+
pub fn sleep(&self) -> napi::Result<()> {
435+
self.inner.sleep().map_err(napi_anyhow_error)
436436
}
437437

438438
#[napi]
439-
pub fn destroy(&self) {
440-
self.inner.destroy();
439+
pub fn destroy(&self) -> napi::Result<()> {
440+
self.inner.destroy().map_err(napi_anyhow_error)
441441
}
442442

443443
#[napi]

0 commit comments

Comments
 (0)