Skip to content

Commit 52b2be3

Browse files
committed
fix(rivetkit): drain shutdown work before sleep
1 parent 86c11aa commit 52b2be3

6 files changed

Lines changed: 222 additions & 56 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,10 @@ use crate::actor::queue::{QueueInspectorUpdateCallback, QueueMetadata, QueueWait
3232
use crate::actor::schedule::{InternalKeepAwakeCallback, LocalAlarmCallback};
3333
use crate::actor::sleep::{CanSleep, SleepState};
3434
use crate::actor::state::{PendingSave, PersistedActor, RequestSaveOpts};
35+
use crate::actor::task::LifecycleEvent;
3536
#[cfg(not(target_arch = "wasm32"))]
3637
use crate::actor::task::{LIFECYCLE_EVENT_INBOX_CHANNEL, actor_channel_overloaded_error};
37-
use crate::actor::task::LifecycleEvent;
3838
use crate::actor::task_types::UserTaskKind;
39-
#[cfg(feature = "wasm-runtime")]
40-
use crate::actor::work_registry::CountGuard;
4139
use crate::actor::work_registry::RegionGuard;
4240
use crate::error::{ActorLifecycle as ActorLifecycleError, ActorRuntime};
4341
use crate::inspector::{Inspector, InspectorSnapshot};
@@ -107,7 +105,7 @@ pub(crate) struct ActorContextInner {
107105
// Forced-sync: queue config is read from sync public methods before blocking
108106
// on async queue work.
109107
pub(super) queue_config: Mutex<ActorConfig>,
110-
pub(super) queue_abort_signal: Option<CancellationToken>,
108+
pub(super) queue_abort_signal: Mutex<CancellationToken>,
111109
pub(super) queue_initialize: OnceCell<()>,
112110
// Forced-sync: startup installs preload before any queue method awaits init.
113111
pub(super) queue_preloaded_kv: Mutex<Option<PreloadedKv>>,
@@ -135,7 +133,7 @@ pub(crate) struct ActorContextInner {
135133
destroy_requested: AtomicBool,
136134
destroy_completed: AtomicBool,
137135
destroy_completion_notify: Notify,
138-
abort_signal: CancellationToken,
136+
abort_signal: Mutex<CancellationToken>,
139137
shutdown_deadline: CancellationToken,
140138
// Forced-sync: runtime wiring slots are configured through synchronous
141139
// lifecycle setup and cloned before sending events.
@@ -280,7 +278,7 @@ impl ActorContext {
280278
#[cfg(test)]
281279
schedule_driver_alarm_cancel_count: AtomicUsize::new(0),
282280
queue_config: Mutex::new(config.clone()),
283-
queue_abort_signal: Some(abort_signal.clone()),
281+
queue_abort_signal: Mutex::new(abort_signal.clone()),
284282
queue_initialize: OnceCell::new(),
285283
queue_preloaded_kv: Mutex::new(None),
286284
queue_preloaded_message_entries: Mutex::new(None),
@@ -303,7 +301,7 @@ impl ActorContext {
303301
destroy_requested: AtomicBool::new(false),
304302
destroy_completed: AtomicBool::new(false),
305303
destroy_completion_notify: Notify::new(),
306-
abort_signal,
304+
abort_signal: Mutex::new(abort_signal),
307305
shutdown_deadline,
308306
inspector: RwLock::new(None),
309307
inspector_attach_count: RwLock::new(None),
@@ -481,30 +479,44 @@ impl ActorContext {
481479
self.flush_on_shutdown();
482480
self.0.destroy_requested.store(true, Ordering::SeqCst);
483481
self.0.destroy_completed.store(false, Ordering::SeqCst);
484-
self.0.abort_signal.cancel();
482+
self.0.abort_signal.lock().cancel();
485483
}
486484

487485
#[cfg(feature = "wasm-runtime")]
488486
fn mark_destroy_requested_without_spawn(&self) {
489487
self.cancel_sleep_timer();
490488
self.0.destroy_requested.store(true, Ordering::SeqCst);
491489
self.0.destroy_completed.store(false, Ordering::SeqCst);
492-
self.0.abort_signal.cancel();
490+
self.0.abort_signal.lock().cancel();
493491
}
494492

495493
#[doc(hidden)]
496494
pub fn cancel_abort_signal_for_sleep(&self) {
497-
self.0.abort_signal.cancel();
495+
self.0.abort_signal.lock().cancel();
496+
}
497+
498+
pub(crate) fn reset_abort_signal_for_start(&self) {
499+
let mut abort_signal = self.0.abort_signal.lock();
500+
if !abort_signal.is_cancelled() {
501+
return;
502+
}
503+
504+
// Sleep cancels the generation abort signal to break queue waits and the
505+
// run loop out of blocking calls. A restarted actor needs a fresh signal
506+
// so the next generation can wait normally.
507+
let next_signal = CancellationToken::new();
508+
*abort_signal = next_signal.clone();
509+
*self.0.queue_abort_signal.lock() = next_signal;
498510
}
499511

500512
#[doc(hidden)]
501513
pub fn actor_abort_signal(&self) -> CancellationToken {
502-
self.0.abort_signal.clone()
514+
self.0.abort_signal.lock().clone()
503515
}
504516

505517
#[doc(hidden)]
506518
pub fn actor_aborted(&self) -> bool {
507-
self.0.abort_signal.is_cancelled()
519+
self.0.abort_signal.lock().is_cancelled()
508520
}
509521

510522
/// Fires when the shutdown grace deadline has elapsed and core is forcing
@@ -562,12 +574,8 @@ impl ActorContext {
562574

563575
#[cfg(feature = "wasm-runtime")]
564576
pub fn wait_until(&self, future: impl Future<Output = ()> + 'static) {
565-
let counter = self.0.sleep.work.shutdown_counter.clone();
566-
counter.increment();
567-
let guard = CountGuard::from_incremented(counter);
568577
let ctx = self.clone();
569-
wasm_bindgen_futures::spawn_local(async move {
570-
let _guard = guard;
578+
self.track_shutdown_task(async move {
571579
ctx.record_user_task_started(UserTaskKind::WaitUntil);
572580
let started_at = Instant::now();
573581
future.await;
@@ -1314,6 +1322,10 @@ impl ActorContext {
13141322
self.0.sleep_requested.load(Ordering::SeqCst)
13151323
}
13161324

1325+
pub(crate) fn clear_sleep_requested(&self) {
1326+
self.0.sleep_requested.store(false, Ordering::SeqCst);
1327+
}
1328+
13171329
fn keep_awake_guard(&self) -> KeepAwakeGuard {
13181330
let region = self
13191331
.keep_awake_region()
@@ -1484,15 +1496,15 @@ impl ActorContext {
14841496
};
14851497

14861498
match sender.try_reserve() {
1487-
Ok(permit) => {
1488-
permit.send(event);
1489-
}
1490-
#[cfg(target_arch = "wasm32")]
1491-
Err(_) => {}
1492-
#[cfg(not(target_arch = "wasm32"))]
1493-
Err(_) => {
1494-
let _ = actor_channel_overloaded_error(
1495-
LIFECYCLE_EVENT_INBOX_CHANNEL,
1499+
Ok(permit) => {
1500+
permit.send(event);
1501+
}
1502+
#[cfg(target_arch = "wasm32")]
1503+
Err(_) => {}
1504+
#[cfg(not(target_arch = "wasm32"))]
1505+
Err(_) => {
1506+
let _ = actor_channel_overloaded_error(
1507+
LIFECYCLE_EVENT_INBOX_CHANNEL,
14961508
self.0.lifecycle_event_inbox_capacity,
14971509
operation,
14981510
Some(&self.0.metrics),

rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -818,25 +818,17 @@ impl ActorContext {
818818
timeout: Option<Duration>,
819819
signal: Option<&CancellationToken>,
820820
) -> WaitOutcome {
821+
let actor_abort_signal = self.0.queue_abort_signal.lock().clone();
821822
if signal.is_some_and(CancellationToken::is_cancelled) {
822823
return WaitOutcome::Aborted;
823824
}
824-
if self
825-
.0
826-
.queue_abort_signal
827-
.as_ref()
828-
.is_some_and(CancellationToken::is_cancelled)
829-
{
825+
if actor_abort_signal.is_cancelled() {
830826
return WaitOutcome::Aborted;
831827
}
832828

833829
let notified = self.0.queue_notify.notified();
834830
let actor_aborted = async {
835-
if let Some(signal) = &self.0.queue_abort_signal {
836-
signal.cancelled().await;
837-
} else {
838-
pending::<()>().await;
839-
}
831+
actor_abort_signal.cancelled().await;
840832
};
841833
let external_aborted = async {
842834
if let Some(signal) = signal {

0 commit comments

Comments
 (0)