Skip to content

Commit 2f3266d

Browse files
committed
fix(rivetkit): drain shutdown work before sleep
1 parent 3dad87c commit 2f3266d

6 files changed

Lines changed: 173 additions & 55 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,10 @@ use crate::actor::queue::{QueueInspectorUpdateCallback, QueueMetadata, QueueWait
3232
use crate::actor::schedule::{InternalKeepAwakeCallback, LocalAlarmCallback};
3333
use crate::actor::sleep::{CanSleep, SleepState};
3434
use crate::actor::state::{PendingSave, PersistedActor, RequestSaveOpts};
35+
use crate::actor::task::LifecycleEvent;
3536
#[cfg(not(target_arch = "wasm32"))]
3637
use crate::actor::task::{LIFECYCLE_EVENT_INBOX_CHANNEL, actor_channel_overloaded_error};
37-
use crate::actor::task::LifecycleEvent;
3838
use crate::actor::task_types::UserTaskKind;
39-
#[cfg(feature = "wasm-runtime")]
40-
use crate::actor::work_registry::CountGuard;
4139
use crate::actor::work_registry::RegionGuard;
4240
use crate::error::{ActorLifecycle as ActorLifecycleError, ActorRuntime};
4341
use crate::inspector::{Inspector, InspectorSnapshot};
@@ -107,7 +105,7 @@ pub(crate) struct ActorContextInner {
107105
// Forced-sync: queue config is read from sync public methods before blocking
108106
// on async queue work.
109107
pub(super) queue_config: Mutex<ActorConfig>,
110-
pub(super) queue_abort_signal: Option<CancellationToken>,
108+
pub(super) queue_abort_signal: Mutex<CancellationToken>,
111109
pub(super) queue_initialize: OnceCell<()>,
112110
// Forced-sync: startup installs preload before any queue method awaits init.
113111
pub(super) queue_preloaded_kv: Mutex<Option<PreloadedKv>>,
@@ -135,7 +133,7 @@ pub(crate) struct ActorContextInner {
135133
destroy_requested: AtomicBool,
136134
destroy_completed: AtomicBool,
137135
destroy_completion_notify: Notify,
138-
abort_signal: CancellationToken,
136+
abort_signal: Mutex<CancellationToken>,
139137
shutdown_deadline: CancellationToken,
140138
// Forced-sync: runtime wiring slots are configured through synchronous
141139
// lifecycle setup and cloned before sending events.
@@ -280,7 +278,7 @@ impl ActorContext {
280278
#[cfg(test)]
281279
schedule_driver_alarm_cancel_count: AtomicUsize::new(0),
282280
queue_config: Mutex::new(config.clone()),
283-
queue_abort_signal: Some(abort_signal.clone()),
281+
queue_abort_signal: Mutex::new(abort_signal.clone()),
284282
queue_initialize: OnceCell::new(),
285283
queue_preloaded_kv: Mutex::new(None),
286284
queue_preloaded_message_entries: Mutex::new(None),
@@ -303,7 +301,7 @@ impl ActorContext {
303301
destroy_requested: AtomicBool::new(false),
304302
destroy_completed: AtomicBool::new(false),
305303
destroy_completion_notify: Notify::new(),
306-
abort_signal,
304+
abort_signal: Mutex::new(abort_signal),
307305
shutdown_deadline,
308306
inspector: RwLock::new(None),
309307
inspector_attach_count: RwLock::new(None),
@@ -481,30 +479,41 @@ impl ActorContext {
481479
self.flush_on_shutdown();
482480
self.0.destroy_requested.store(true, Ordering::SeqCst);
483481
self.0.destroy_completed.store(false, Ordering::SeqCst);
484-
self.0.abort_signal.cancel();
482+
self.0.abort_signal.lock().cancel();
485483
}
486484

487485
#[cfg(feature = "wasm-runtime")]
488486
fn mark_destroy_requested_without_spawn(&self) {
489487
self.cancel_sleep_timer();
490488
self.0.destroy_requested.store(true, Ordering::SeqCst);
491489
self.0.destroy_completed.store(false, Ordering::SeqCst);
492-
self.0.abort_signal.cancel();
490+
self.0.abort_signal.lock().cancel();
493491
}
494492

495493
#[doc(hidden)]
496494
pub fn cancel_abort_signal_for_sleep(&self) {
497-
self.0.abort_signal.cancel();
495+
self.0.abort_signal.lock().cancel();
496+
}
497+
498+
pub(crate) fn reset_abort_signal_for_start(&self) {
499+
let mut abort_signal = self.0.abort_signal.lock();
500+
if !abort_signal.is_cancelled() {
501+
return;
502+
}
503+
504+
let next_signal = CancellationToken::new();
505+
*abort_signal = next_signal.clone();
506+
*self.0.queue_abort_signal.lock() = next_signal;
498507
}
499508

500509
#[doc(hidden)]
501510
pub fn actor_abort_signal(&self) -> CancellationToken {
502-
self.0.abort_signal.clone()
511+
self.0.abort_signal.lock().clone()
503512
}
504513

505514
#[doc(hidden)]
506515
pub fn actor_aborted(&self) -> bool {
507-
self.0.abort_signal.is_cancelled()
516+
self.0.abort_signal.lock().is_cancelled()
508517
}
509518

510519
/// Fires when the shutdown grace deadline has elapsed and core is forcing
@@ -562,12 +571,8 @@ impl ActorContext {
562571

563572
#[cfg(feature = "wasm-runtime")]
564573
pub fn wait_until(&self, future: impl Future<Output = ()> + 'static) {
565-
let counter = self.0.sleep.work.shutdown_counter.clone();
566-
counter.increment();
567-
let guard = CountGuard::from_incremented(counter);
568574
let ctx = self.clone();
569-
wasm_bindgen_futures::spawn_local(async move {
570-
let _guard = guard;
575+
self.track_shutdown_task(async move {
571576
ctx.record_user_task_started(UserTaskKind::WaitUntil);
572577
let started_at = Instant::now();
573578
future.await;
@@ -1314,6 +1319,10 @@ impl ActorContext {
13141319
self.0.sleep_requested.load(Ordering::SeqCst)
13151320
}
13161321

1322+
pub(crate) fn clear_sleep_requested(&self) {
1323+
self.0.sleep_requested.store(false, Ordering::SeqCst);
1324+
}
1325+
13171326
fn keep_awake_guard(&self) -> KeepAwakeGuard {
13181327
let region = self
13191328
.keep_awake_region()
@@ -1484,15 +1493,15 @@ impl ActorContext {
14841493
};
14851494

14861495
match sender.try_reserve() {
1487-
Ok(permit) => {
1488-
permit.send(event);
1489-
}
1490-
#[cfg(target_arch = "wasm32")]
1491-
Err(_) => {}
1492-
#[cfg(not(target_arch = "wasm32"))]
1493-
Err(_) => {
1494-
let _ = actor_channel_overloaded_error(
1495-
LIFECYCLE_EVENT_INBOX_CHANNEL,
1496+
Ok(permit) => {
1497+
permit.send(event);
1498+
}
1499+
#[cfg(target_arch = "wasm32")]
1500+
Err(_) => {}
1501+
#[cfg(not(target_arch = "wasm32"))]
1502+
Err(_) => {
1503+
let _ = actor_channel_overloaded_error(
1504+
LIFECYCLE_EVENT_INBOX_CHANNEL,
14961505
self.0.lifecycle_event_inbox_capacity,
14971506
operation,
14981507
Some(&self.0.metrics),

rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -818,25 +818,17 @@ impl ActorContext {
818818
timeout: Option<Duration>,
819819
signal: Option<&CancellationToken>,
820820
) -> WaitOutcome {
821+
let actor_abort_signal = self.0.queue_abort_signal.lock().clone();
821822
if signal.is_some_and(CancellationToken::is_cancelled) {
822823
return WaitOutcome::Aborted;
823824
}
824-
if self
825-
.0
826-
.queue_abort_signal
827-
.as_ref()
828-
.is_some_and(CancellationToken::is_cancelled)
829-
{
825+
if actor_abort_signal.is_cancelled() {
830826
return WaitOutcome::Aborted;
831827
}
832828

833829
let notified = self.0.queue_notify.notified();
834830
let actor_aborted = async {
835-
if let Some(signal) = &self.0.queue_abort_signal {
836-
signal.cancelled().await;
837-
} else {
838-
pending::<()>().await;
839-
}
831+
actor_abort_signal.cancelled().await;
840832
};
841833
let external_aborted = async {
842834
if let Some(signal) = signal {

rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs

Lines changed: 106 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,17 @@ use tracing::Instrument;
1414

1515
use crate::actor::config::ActorConfig;
1616
use crate::actor::context::ActorContext;
17+
use crate::actor::task_types::ShutdownKind;
1718
use crate::actor::work_registry::{CountGuard, RegionGuard, WorkRegistry};
1819
#[cfg(feature = "wasm-runtime")]
1920
use crate::runtime::RuntimeSpawner;
20-
use crate::time::{Instant, sleep};
2121
#[cfg(test)]
2222
use crate::time::sleep_until;
23+
use crate::time::{Instant, sleep};
2324
#[cfg(test)]
2425
use crate::types::ActorKey;
26+
#[cfg(feature = "wasm-runtime")]
27+
use futures::channel::oneshot as futures_oneshot;
2528

2629
/// Per-actor sleep state.
2730
///
@@ -250,8 +253,9 @@ impl ActorContext {
250253
CanSleep::Yes
251254
}
252255

253-
pub(crate) fn can_finalize_sleep(&self) -> bool {
256+
pub(crate) fn can_finalize_shutdown(&self, reason: ShutdownKind) -> bool {
254257
self.0.sleep.work.core_dispatched_hooks.load() == 0
258+
&& (matches!(reason, ShutdownKind::Destroy) || !self.run_handler_active())
255259
&& self.shutdown_task_count() == 0
256260
&& self.sleep_keep_awake_count() == 0
257261
&& self.sleep_internal_keep_awake_count() == 0
@@ -359,7 +363,7 @@ impl ActorContext {
359363
tokio::pin!(idle);
360364
idle.as_mut().enable();
361365

362-
if self.can_finalize_sleep() {
366+
if self.can_finalize_shutdown(ShutdownKind::Sleep) {
363367
return true;
364368
}
365369

@@ -472,12 +476,16 @@ impl ActorContext {
472476
let ctx = self.clone();
473477
shutdown_tasks.spawn(
474478
async move {
475-
let _guard = guard;
476-
fut.await;
479+
{
480+
let _guard = guard;
481+
fut.await;
482+
}
477483
ctx.reset_sleep_timer();
478484
}
479485
.in_current_span(),
480486
);
487+
drop(shutdown_tasks);
488+
self.reset_sleep_timer();
481489
true
482490
}
483491

@@ -486,6 +494,7 @@ impl ActorContext {
486494
where
487495
F: Future<Output = ()> + 'static,
488496
{
497+
let mut local_shutdown_tasks = self.0.sleep.work.local_shutdown_tasks.lock();
489498
if self.0.sleep.work.teardown_started.load(Ordering::Acquire) {
490499
tracing::warn!("shutdown task spawned after teardown; aborting immediately");
491500
return false;
@@ -494,21 +503,36 @@ impl ActorContext {
494503
counter.increment();
495504
let guard = CountGuard::from_incremented(counter);
496505
let ctx = self.clone();
506+
let (complete_tx, complete_rx) = futures_oneshot::channel();
507+
local_shutdown_tasks.push(complete_rx);
508+
drop(local_shutdown_tasks);
497509
wasm_bindgen_futures::spawn_local(
498510
async move {
499-
let _guard = guard;
500-
fut.await;
511+
{
512+
let _guard = guard;
513+
fut.await;
514+
}
515+
let _ = complete_tx.send(());
501516
ctx.reset_sleep_timer();
502517
}
503518
.in_current_span(),
504519
);
520+
self.reset_sleep_timer();
505521
true
506522
}
507523

508524
pub(crate) fn shutdown_task_count(&self) -> usize {
509525
self.0.sleep.work.shutdown_counter.load()
510526
}
511527

528+
pub(crate) fn mark_shutdown_deadline_reached(&self) {
529+
self.0
530+
.sleep
531+
.work
532+
.shutdown_deadline_reached
533+
.store(true, Ordering::Release);
534+
}
535+
512536
pub(crate) fn begin_core_dispatched_hook(&self) {
513537
self.0.sleep.work.core_dispatched_hooks.increment();
514538
self.reset_sleep_timer();
@@ -524,17 +548,83 @@ impl ActorContext {
524548
}
525549

526550
pub(crate) async fn teardown_sleep_state(&self) {
527-
self.0
551+
let abort_remaining = self
552+
.0
528553
.sleep
529554
.work
530-
.teardown_started
531-
.store(true, Ordering::Release);
532-
let mut shutdown_tasks = {
533-
let mut guard = self.0.sleep.work.shutdown_tasks.lock();
534-
std::mem::take(&mut *guard)
535-
};
536-
shutdown_tasks.shutdown().await;
537-
*self.0.sleep.work.shutdown_tasks.lock() = shutdown_tasks;
555+
.shutdown_deadline_reached
556+
.swap(false, Ordering::AcqRel);
557+
if abort_remaining {
558+
self.0
559+
.sleep
560+
.work
561+
.teardown_started
562+
.store(true, Ordering::Release);
563+
}
564+
565+
#[cfg(feature = "wasm-runtime")]
566+
{
567+
loop {
568+
let local_shutdown_tasks = {
569+
let mut guard = self.0.sleep.work.local_shutdown_tasks.lock();
570+
let taken = std::mem::take(&mut *guard);
571+
if taken.is_empty() {
572+
self.0
573+
.sleep
574+
.work
575+
.teardown_started
576+
.store(true, Ordering::Release);
577+
return;
578+
}
579+
taken
580+
};
581+
582+
if abort_remaining {
583+
self.0.sleep.work.local_shutdown_tasks.lock().clear();
584+
self.0
585+
.sleep
586+
.work
587+
.teardown_started
588+
.store(true, Ordering::Release);
589+
return;
590+
}
591+
592+
for task in local_shutdown_tasks {
593+
if task.await.is_err() {
594+
tracing::debug!("shutdown task completion dropped during teardown");
595+
}
596+
}
597+
}
598+
}
599+
600+
#[cfg(not(feature = "wasm-runtime"))]
601+
loop {
602+
let mut shutdown_tasks = {
603+
let mut guard = self.0.sleep.work.shutdown_tasks.lock();
604+
let taken = std::mem::take(&mut *guard);
605+
if taken.is_empty() {
606+
self.0
607+
.sleep
608+
.work
609+
.teardown_started
610+
.store(true, Ordering::Release);
611+
return;
612+
}
613+
taken
614+
};
615+
616+
if abort_remaining {
617+
shutdown_tasks.shutdown().await;
618+
} else {
619+
while let Some(result) = shutdown_tasks.join_next().await {
620+
if let Err(error) = result
621+
&& !error.is_cancelled()
622+
{
623+
tracing::error!(?error, "shutdown task join failed during teardown");
624+
}
625+
}
626+
}
627+
}
538628
}
539629

540630
pub(crate) fn sleep_state_config(&self) -> ActorConfig {

0 commit comments

Comments
 (0)