Skip to content

Commit ec8b474

Browse files
committed
fix(rivetkit): drain shutdown work before sleep
1 parent 86c11aa commit ec8b474

6 files changed

Lines changed: 206 additions & 62 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,10 @@ use crate::actor::queue::{QueueInspectorUpdateCallback, QueueMetadata, QueueWait
3232
use crate::actor::schedule::{InternalKeepAwakeCallback, LocalAlarmCallback};
3333
use crate::actor::sleep::{CanSleep, SleepState};
3434
use crate::actor::state::{PendingSave, PersistedActor, RequestSaveOpts};
35+
use crate::actor::task::LifecycleEvent;
3536
#[cfg(not(target_arch = "wasm32"))]
3637
use crate::actor::task::{LIFECYCLE_EVENT_INBOX_CHANNEL, actor_channel_overloaded_error};
37-
use crate::actor::task::LifecycleEvent;
3838
use crate::actor::task_types::UserTaskKind;
39-
#[cfg(feature = "wasm-runtime")]
40-
use crate::actor::work_registry::CountGuard;
4139
use crate::actor::work_registry::RegionGuard;
4240
use crate::error::{ActorLifecycle as ActorLifecycleError, ActorRuntime};
4341
use crate::inspector::{Inspector, InspectorSnapshot};
@@ -107,7 +105,7 @@ pub(crate) struct ActorContextInner {
107105
// Forced-sync: queue config is read from sync public methods before blocking
108106
// on async queue work.
109107
pub(super) queue_config: Mutex<ActorConfig>,
110-
pub(super) queue_abort_signal: Option<CancellationToken>,
108+
pub(super) queue_abort_signal: Mutex<CancellationToken>,
111109
pub(super) queue_initialize: OnceCell<()>,
112110
// Forced-sync: startup installs preload before any queue method awaits init.
113111
pub(super) queue_preloaded_kv: Mutex<Option<PreloadedKv>>,
@@ -135,7 +133,7 @@ pub(crate) struct ActorContextInner {
135133
destroy_requested: AtomicBool,
136134
destroy_completed: AtomicBool,
137135
destroy_completion_notify: Notify,
138-
abort_signal: CancellationToken,
136+
abort_signal: Mutex<CancellationToken>,
139137
shutdown_deadline: CancellationToken,
140138
// Forced-sync: runtime wiring slots are configured through synchronous
141139
// lifecycle setup and cloned before sending events.
@@ -280,7 +278,7 @@ impl ActorContext {
280278
#[cfg(test)]
281279
schedule_driver_alarm_cancel_count: AtomicUsize::new(0),
282280
queue_config: Mutex::new(config.clone()),
283-
queue_abort_signal: Some(abort_signal.clone()),
281+
queue_abort_signal: Mutex::new(abort_signal.clone()),
284282
queue_initialize: OnceCell::new(),
285283
queue_preloaded_kv: Mutex::new(None),
286284
queue_preloaded_message_entries: Mutex::new(None),
@@ -303,7 +301,7 @@ impl ActorContext {
303301
destroy_requested: AtomicBool::new(false),
304302
destroy_completed: AtomicBool::new(false),
305303
destroy_completion_notify: Notify::new(),
306-
abort_signal,
304+
abort_signal: Mutex::new(abort_signal),
307305
shutdown_deadline,
308306
inspector: RwLock::new(None),
309307
inspector_attach_count: RwLock::new(None),
@@ -481,30 +479,41 @@ impl ActorContext {
481479
self.flush_on_shutdown();
482480
self.0.destroy_requested.store(true, Ordering::SeqCst);
483481
self.0.destroy_completed.store(false, Ordering::SeqCst);
484-
self.0.abort_signal.cancel();
482+
self.0.abort_signal.lock().cancel();
485483
}
486484

487485
#[cfg(feature = "wasm-runtime")]
488486
fn mark_destroy_requested_without_spawn(&self) {
489487
self.cancel_sleep_timer();
490488
self.0.destroy_requested.store(true, Ordering::SeqCst);
491489
self.0.destroy_completed.store(false, Ordering::SeqCst);
492-
self.0.abort_signal.cancel();
490+
self.0.abort_signal.lock().cancel();
493491
}
494492

495493
#[doc(hidden)]
496494
pub fn cancel_abort_signal_for_sleep(&self) {
497-
self.0.abort_signal.cancel();
495+
self.0.abort_signal.lock().cancel();
496+
}
497+
498+
pub(crate) fn reset_abort_signal_for_start(&self) {
499+
let mut abort_signal = self.0.abort_signal.lock();
500+
if !abort_signal.is_cancelled() {
501+
return;
502+
}
503+
504+
let next_signal = CancellationToken::new();
505+
*abort_signal = next_signal.clone();
506+
*self.0.queue_abort_signal.lock() = next_signal;
498507
}
499508

500509
#[doc(hidden)]
501510
pub fn actor_abort_signal(&self) -> CancellationToken {
502-
self.0.abort_signal.clone()
511+
self.0.abort_signal.lock().clone()
503512
}
504513

505514
#[doc(hidden)]
506515
pub fn actor_aborted(&self) -> bool {
507-
self.0.abort_signal.is_cancelled()
516+
self.0.abort_signal.lock().is_cancelled()
508517
}
509518

510519
/// Fires when the shutdown grace deadline has elapsed and core is forcing
@@ -562,12 +571,8 @@ impl ActorContext {
562571

563572
#[cfg(feature = "wasm-runtime")]
564573
pub fn wait_until(&self, future: impl Future<Output = ()> + 'static) {
565-
let counter = self.0.sleep.work.shutdown_counter.clone();
566-
counter.increment();
567-
let guard = CountGuard::from_incremented(counter);
568574
let ctx = self.clone();
569-
wasm_bindgen_futures::spawn_local(async move {
570-
let _guard = guard;
575+
self.track_shutdown_task(async move {
571576
ctx.record_user_task_started(UserTaskKind::WaitUntil);
572577
let started_at = Instant::now();
573578
future.await;
@@ -1314,6 +1319,10 @@ impl ActorContext {
13141319
self.0.sleep_requested.load(Ordering::SeqCst)
13151320
}
13161321

1322+
pub(crate) fn clear_sleep_requested(&self) {
1323+
self.0.sleep_requested.store(false, Ordering::SeqCst);
1324+
}
1325+
13171326
fn keep_awake_guard(&self) -> KeepAwakeGuard {
13181327
let region = self
13191328
.keep_awake_region()
@@ -1484,15 +1493,15 @@ impl ActorContext {
14841493
};
14851494

14861495
match sender.try_reserve() {
1487-
Ok(permit) => {
1488-
permit.send(event);
1489-
}
1490-
#[cfg(target_arch = "wasm32")]
1491-
Err(_) => {}
1492-
#[cfg(not(target_arch = "wasm32"))]
1493-
Err(_) => {
1494-
let _ = actor_channel_overloaded_error(
1495-
LIFECYCLE_EVENT_INBOX_CHANNEL,
1496+
Ok(permit) => {
1497+
permit.send(event);
1498+
}
1499+
#[cfg(target_arch = "wasm32")]
1500+
Err(_) => {}
1501+
#[cfg(not(target_arch = "wasm32"))]
1502+
Err(_) => {
1503+
let _ = actor_channel_overloaded_error(
1504+
LIFECYCLE_EVENT_INBOX_CHANNEL,
14961505
self.0.lifecycle_event_inbox_capacity,
14971506
operation,
14981507
Some(&self.0.metrics),

rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -818,25 +818,17 @@ impl ActorContext {
818818
timeout: Option<Duration>,
819819
signal: Option<&CancellationToken>,
820820
) -> WaitOutcome {
821+
let actor_abort_signal = self.0.queue_abort_signal.lock().clone();
821822
if signal.is_some_and(CancellationToken::is_cancelled) {
822823
return WaitOutcome::Aborted;
823824
}
824-
if self
825-
.0
826-
.queue_abort_signal
827-
.as_ref()
828-
.is_some_and(CancellationToken::is_cancelled)
829-
{
825+
if actor_abort_signal.is_cancelled() {
830826
return WaitOutcome::Aborted;
831827
}
832828

833829
let notified = self.0.queue_notify.notified();
834830
let actor_aborted = async {
835-
if let Some(signal) = &self.0.queue_abort_signal {
836-
signal.cancelled().await;
837-
} else {
838-
pending::<()>().await;
839-
}
831+
actor_abort_signal.cancelled().await;
840832
};
841833
let external_aborted = async {
842834
if let Some(signal) = signal {

rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs

Lines changed: 126 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,21 @@ use tracing::Instrument;
1414

1515
use crate::actor::config::ActorConfig;
1616
use crate::actor::context::ActorContext;
17+
use crate::actor::task_types::ShutdownKind;
1718
use crate::actor::work_registry::{CountGuard, RegionGuard, WorkRegistry};
1819
#[cfg(feature = "wasm-runtime")]
20+
use crate::actor::work_registry::LocalShutdownTask;
21+
#[cfg(feature = "wasm-runtime")]
1922
use crate::runtime::RuntimeSpawner;
20-
use crate::time::{Instant, sleep};
2123
#[cfg(test)]
2224
use crate::time::sleep_until;
25+
use crate::time::{Instant, sleep};
2326
#[cfg(test)]
2427
use crate::types::ActorKey;
28+
#[cfg(feature = "wasm-runtime")]
29+
use futures::channel::oneshot as futures_oneshot;
30+
#[cfg(feature = "wasm-runtime")]
31+
use futures::future::{AbortHandle, Abortable};
2532

2633
/// Per-actor sleep state.
2734
///
@@ -250,8 +257,9 @@ impl ActorContext {
250257
CanSleep::Yes
251258
}
252259

253-
pub(crate) fn can_finalize_sleep(&self) -> bool {
260+
pub(crate) fn can_finalize_shutdown(&self, reason: ShutdownKind) -> bool {
254261
self.0.sleep.work.core_dispatched_hooks.load() == 0
262+
&& (matches!(reason, ShutdownKind::Destroy) || !self.run_handler_active())
255263
&& self.shutdown_task_count() == 0
256264
&& self.sleep_keep_awake_count() == 0
257265
&& self.sleep_internal_keep_awake_count() == 0
@@ -359,7 +367,7 @@ impl ActorContext {
359367
tokio::pin!(idle);
360368
idle.as_mut().enable();
361369

362-
if self.can_finalize_sleep() {
370+
if self.can_finalize_shutdown(ShutdownKind::Sleep) {
363371
return true;
364372
}
365373

@@ -472,12 +480,16 @@ impl ActorContext {
472480
let ctx = self.clone();
473481
shutdown_tasks.spawn(
474482
async move {
475-
let _guard = guard;
476-
fut.await;
483+
{
484+
let _guard = guard;
485+
fut.await;
486+
}
477487
ctx.reset_sleep_timer();
478488
}
479489
.in_current_span(),
480490
);
491+
drop(shutdown_tasks);
492+
self.reset_sleep_timer();
481493
true
482494
}
483495

@@ -486,6 +498,7 @@ impl ActorContext {
486498
where
487499
F: Future<Output = ()> + 'static,
488500
{
501+
let mut local_shutdown_tasks = self.0.sleep.work.local_shutdown_tasks.lock();
489502
if self.0.sleep.work.teardown_started.load(Ordering::Acquire) {
490503
tracing::warn!("shutdown task spawned after teardown; aborting immediately");
491504
return false;
@@ -494,21 +507,46 @@ impl ActorContext {
494507
counter.increment();
495508
let guard = CountGuard::from_incremented(counter);
496509
let ctx = self.clone();
510+
let (complete_tx, complete_rx) = futures_oneshot::channel();
511+
let (abort_handle, abort_registration) = AbortHandle::new_pair();
512+
local_shutdown_tasks.push(LocalShutdownTask {
513+
abort_handle,
514+
complete_rx,
515+
});
516+
drop(local_shutdown_tasks);
517+
let ctx_for_task = ctx.clone();
497518
wasm_bindgen_futures::spawn_local(
498519
async move {
499-
let _guard = guard;
500-
fut.await;
501-
ctx.reset_sleep_timer();
520+
let task = async move {
521+
{
522+
let _guard = guard;
523+
fut.await;
524+
}
525+
let _ = complete_tx.send(());
526+
ctx_for_task.reset_sleep_timer();
527+
};
528+
if Abortable::new(task, abort_registration).await.is_err() {
529+
ctx.reset_sleep_timer();
530+
}
502531
}
503532
.in_current_span(),
504533
);
534+
self.reset_sleep_timer();
505535
true
506536
}
507537

508538
pub(crate) fn shutdown_task_count(&self) -> usize {
509539
self.0.sleep.work.shutdown_counter.load()
510540
}
511541

542+
pub(crate) fn mark_shutdown_deadline_reached(&self) {
543+
self.0
544+
.sleep
545+
.work
546+
.shutdown_deadline_reached
547+
.store(true, Ordering::Release);
548+
}
549+
512550
pub(crate) fn begin_core_dispatched_hook(&self) {
513551
self.0.sleep.work.core_dispatched_hooks.increment();
514552
self.reset_sleep_timer();
@@ -524,17 +562,88 @@ impl ActorContext {
524562
}
525563

526564
pub(crate) async fn teardown_sleep_state(&self) {
527-
self.0
565+
let abort_remaining = self
566+
.0
528567
.sleep
529568
.work
530-
.teardown_started
531-
.store(true, Ordering::Release);
532-
let mut shutdown_tasks = {
533-
let mut guard = self.0.sleep.work.shutdown_tasks.lock();
534-
std::mem::take(&mut *guard)
535-
};
536-
shutdown_tasks.shutdown().await;
537-
*self.0.sleep.work.shutdown_tasks.lock() = shutdown_tasks;
569+
.shutdown_deadline_reached
570+
.swap(false, Ordering::AcqRel);
571+
if abort_remaining {
572+
self.0
573+
.sleep
574+
.work
575+
.teardown_started
576+
.store(true, Ordering::Release);
577+
}
578+
579+
#[cfg(feature = "wasm-runtime")]
580+
{
581+
loop {
582+
let local_shutdown_tasks = {
583+
let mut guard = self.0.sleep.work.local_shutdown_tasks.lock();
584+
let taken = std::mem::take(&mut *guard);
585+
if taken.is_empty() {
586+
self.0
587+
.sleep
588+
.work
589+
.teardown_started
590+
.store(true, Ordering::Release);
591+
return;
592+
}
593+
taken
594+
};
595+
596+
if abort_remaining {
597+
for task in local_shutdown_tasks {
598+
task.abort_handle.abort();
599+
if task.complete_rx.await.is_err() {
600+
tracing::debug!("aborted shutdown task during teardown");
601+
}
602+
}
603+
self.0
604+
.sleep
605+
.work
606+
.teardown_started
607+
.store(true, Ordering::Release);
608+
return;
609+
}
610+
611+
for task in local_shutdown_tasks {
612+
if task.complete_rx.await.is_err() {
613+
tracing::debug!("shutdown task completion dropped during teardown");
614+
}
615+
}
616+
}
617+
}
618+
619+
#[cfg(not(feature = "wasm-runtime"))]
620+
loop {
621+
let mut shutdown_tasks = {
622+
let mut guard = self.0.sleep.work.shutdown_tasks.lock();
623+
let taken = std::mem::take(&mut *guard);
624+
if taken.is_empty() {
625+
self.0
626+
.sleep
627+
.work
628+
.teardown_started
629+
.store(true, Ordering::Release);
630+
return;
631+
}
632+
taken
633+
};
634+
635+
if abort_remaining {
636+
shutdown_tasks.shutdown().await;
637+
} else {
638+
while let Some(result) = shutdown_tasks.join_next().await {
639+
if let Err(error) = result
640+
&& !error.is_cancelled()
641+
{
642+
tracing::error!(?error, "shutdown task join failed during teardown");
643+
}
644+
}
645+
}
646+
}
538647
}
539648

540649
pub(crate) fn sleep_state_config(&self) -> ActorConfig {

0 commit comments

Comments
 (0)