Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 109 additions & 161 deletions rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion rivetkit-rust/packages/rivetkit-core/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(crate) mod work_registry;
pub use action::ActionDispatchError;
pub use config::{ActionDefinition, ActorConfig, ActorConfigOverrides, CanHibernateWebSocket};
pub use connection::ConnHandle;
pub use context::{ActorContext, KeepAwakeRegion, WebSocketCallbackRegion};
pub use context::{ActorContext, ActorWorkRegion, KeepAwakeRegion, WebSocketCallbackRegion};
pub use factory::{ActorEntryFn, ActorFactory};
pub use kv::Kv;
pub use lifecycle_hooks::{ActorEvents, ActorStart, Reply};
Expand All @@ -41,3 +41,4 @@ pub use task::{
LifecycleEvent, LifecycleState,
};
pub use task_types::{ActorChildOutcome, ShutdownKind, StateMutationReason, UserTaskKind};
pub use work_registry::{ActorWorkKind, ActorWorkPolicy};
169 changes: 151 additions & 18 deletions rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#[cfg(test)]
use std::sync::atomic::AtomicUsize as TestAtomicUsize;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
#[cfg(not(feature = "wasm-runtime"))]
use tokio::runtime::Handle;
use tokio::sync::Notify;
Expand All @@ -17,12 +18,10 @@
use crate::actor::task_types::ShutdownKind;
#[cfg(feature = "wasm-runtime")]
use crate::actor::work_registry::LocalShutdownTask;
use crate::actor::work_registry::{CountGuard, RegionGuard, WorkRegistry};
use crate::actor::work_registry::{ActorWorkKind, CountGuard, RegionGuard, WorkRegistry};
#[cfg(feature = "wasm-runtime")]
use crate::runtime::RuntimeSpawner;
#[cfg(test)]
use crate::time::sleep_until;
use crate::time::{Instant, sleep};
use crate::time::{Instant, sleep, sleep_until};
#[cfg(test)]
use crate::types::ActorKey;
#[cfg(feature = "wasm-runtime")]
Expand Down Expand Up @@ -113,6 +112,10 @@
"websocket_callback_count",
&self.work.websocket_callback.load(),
)
.field(
"disconnect_callback_count",
&self.work.disconnect_callback.load(),
)
.finish()
}
}
Expand Down Expand Up @@ -381,7 +384,6 @@
}
}

#[cfg(test)]
pub(crate) async fn wait_for_shutdown_tasks(&self, deadline: Instant) -> bool {
loop {
let activity = self.sleep_activity_notify();
Expand Down Expand Up @@ -412,6 +414,15 @@
}
}

pub async fn wait_for_tracked_shutdown_work(&self) -> bool {
let shutdown_deadline = self.shutdown_deadline_token();
let deadline = Instant::now() + Duration::from_secs(60 * 60 * 24 * 365);
tokio::select! {
result = self.wait_for_shutdown_tasks(deadline) => result,
_ = shutdown_deadline.cancelled() => false,
}
}

pub(crate) async fn wait_for_http_requests_drained(&self, deadline: Instant) -> bool {
let Some(counter) = self.http_request_counter() else {
return true;
Expand Down Expand Up @@ -461,6 +472,119 @@
self.0.sleep.work.websocket_callback.load()
}

pub(crate) fn disconnect_callback_region_state(&self) -> RegionGuard {
self.0.sleep.work.disconnect_callback_guard()
}

#[cfg(not(feature = "wasm-runtime"))]
pub(crate) fn spawn_work_inner<F>(&self, kind: ActorWorkKind, fut: F) -> bool
where
F: Future<Output = ()> + Send + 'static,

Check warning on line 482 in rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs
{
if Handle::try_current().is_err() {
tracing::warn!(kind = kind.label(), "actor work spawned without tokio runtime");
return false;
}

if self.0.sleep.work.teardown_started.load(Ordering::Acquire) {

Check warning on line 489 in rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs
tracing::warn!(kind = kind.label(), "actor work spawned after teardown; aborting immediately");
return false;
}

let policy = kind.policy();
let region = self.begin_work_region(kind);
let ctx = self.clone();
let task = async move {
let _region = region;
if policy.aborts_at_shutdown_deadline {
let shutdown_deadline = ctx.shutdown_deadline_token();
tokio::select! {
_ = fut => {}
_ = shutdown_deadline.cancelled() => {
tracing::warn!(
actor_id = %ctx.actor_id(),
kind = kind.label(),
reason = "shutdown_deadline_elapsed",
"actor work cancelled by shutdown deadline"
);
}
}
} else {
fut.await;
}
ctx.reset_sleep_timer();
}
.in_current_span();
if policy.aborts_at_shutdown_deadline {
self.0.sleep.work.shutdown_tasks.lock().spawn(task);
} else {
self.0
.sleep
.work
.unabortable_shutdown_tasks
.lock()
.spawn(task);
}
self.reset_sleep_timer();
true
}

#[cfg(feature = "wasm-runtime")]
pub(crate) fn spawn_work_inner<F>(&self, kind: ActorWorkKind, fut: F) -> bool
where
F: Future<Output = ()> + 'static,
{

Check warning on line 536 in rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs
let mut local_shutdown_tasks = self.0.sleep.work.local_shutdown_tasks.lock();
if self.0.sleep.work.teardown_started.load(Ordering::Acquire) {
tracing::warn!(kind = kind.label(), "actor work spawned after teardown; aborting immediately");
return false;
}

let policy = kind.policy();
let region = self.begin_work_region(kind);
let ctx = self.clone();
let (complete_tx, complete_rx) = futures_oneshot::channel();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
local_shutdown_tasks.push(LocalShutdownTask {
abort_handle,
complete_rx,
aborts_at_shutdown_deadline: policy.aborts_at_shutdown_deadline,
});
drop(local_shutdown_tasks);
let ctx_for_task = ctx.clone();
wasm_bindgen_futures::spawn_local(
async move {
let task = async move {
let _region = region;
if policy.aborts_at_shutdown_deadline {
let shutdown_deadline = ctx_for_task.shutdown_deadline_token();
tokio::select! {
_ = fut => {}
_ = shutdown_deadline.cancelled() => {
tracing::warn!(
actor_id = %ctx_for_task.actor_id(),
kind = kind.label(),
reason = "shutdown_deadline_elapsed",
"actor work cancelled by shutdown deadline"
);
}
}
} else {
fut.await;
}
let _ = complete_tx.send(());
ctx_for_task.reset_sleep_timer();
};
if Abortable::new(task, abort_registration).await.is_err() {
ctx.reset_sleep_timer();
}
}
.in_current_span(),
);
self.reset_sleep_timer();
true
}

#[cfg(not(feature = "wasm-runtime"))]
pub(crate) fn track_shutdown_task<F>(&self, fut: F) -> bool
where
Expand Down Expand Up @@ -519,6 +643,7 @@
local_shutdown_tasks.push(LocalShutdownTask {
abort_handle,
complete_rx,
aborts_at_shutdown_deadline: true,
});
drop(local_shutdown_tasks);
let ctx_for_task = ctx.clone();
Expand Down Expand Up @@ -605,7 +730,9 @@

if abort_remaining {
for task in local_shutdown_tasks {
task.abort_handle.abort();
if task.aborts_at_shutdown_deadline {
task.abort_handle.abort();
}
if task.complete_rx.await.is_err() {
tracing::debug!("aborted shutdown task during teardown");
}
Expand All @@ -628,29 +755,35 @@

#[cfg(not(feature = "wasm-runtime"))]
loop {
let mut shutdown_tasks = {
let mut abortable_shutdown_tasks = {
let mut guard = self.0.sleep.work.shutdown_tasks.lock();
let taken = std::mem::take(&mut *guard);
if taken.is_empty() {
let mut unabortable_guard = self.0.sleep.work.unabortable_shutdown_tasks.lock();
let unabortable_taken = std::mem::take(&mut *unabortable_guard);
if taken.is_empty() && unabortable_taken.is_empty() {
self.0
.sleep
.work
.teardown_started
.store(true, Ordering::Release);
return;
}
taken
(taken, unabortable_taken)
};

if abort_remaining {
shutdown_tasks.shutdown().await;
} else {
while let Some(result) = shutdown_tasks.join_next().await {
if let Err(error) = result
&& !error.is_cancelled()
{
tracing::error!(?error, "shutdown task join failed during teardown");
}
abortable_shutdown_tasks.0.shutdown().await;
while let Some(result) = abortable_shutdown_tasks.0.join_next().await {
if let Err(error) = result
&& !error.is_cancelled()
{
tracing::error!(?error, "shutdown task join failed during teardown");
}
}
while let Some(result) = abortable_shutdown_tasks.1.join_next().await {
if let Err(error) = result
&& !error.is_cancelled()
{
tracing::error!(?error, "shutdown task join failed during teardown");
}
}
}
Expand Down
Loading
Loading