Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ When the user asks to track something in a note, store it in `.agent/notes/` by
- Do not use em dashes (—). Use periods to separate sentences instead.
- Documenting deltas is not important or useful. A developer who has never worked on the project will not gain extra information if you add a comment stating that something was removed or changed because they don't know what was there before. The only time you would be adding a comment for something NOT being there is if its unintuitive for why its not there in the first place.

### Match statements

- Never use a `_ =>` fall-through arm when matching on a Rust enum (or a TypeScript discriminated union). Enumerate every variant explicitly so adding a new variant later is a compile error instead of a silent behavior change. `_` is fine for `Result`, `Option`, integers, strings, and other open value spaces. `_ => unreachable!()` / `_ => panic!()` are explicit asserts and acceptable.

## Documentation

- If you need to look at the documentation for a package, visit `https://docs.rs/{package-name}`. For example, serde docs live at `https://docs.rs/serde/`.
Expand Down
4 changes: 2 additions & 2 deletions rivetkit-rust/packages/rivetkit-core/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,13 +669,13 @@ impl ActorContext {

pub(crate) fn record_shutdown_wait(
&self,
reason: crate::actor::task_types::StopReason,
reason: crate::actor::task_types::ShutdownKind,
duration: Duration,
) {
self.0.metrics.observe_shutdown_wait(reason, duration);
}

pub(crate) fn record_shutdown_timeout(&self, reason: crate::actor::task_types::StopReason) {
pub(crate) fn record_shutdown_timeout(&self, reason: crate::actor::task_types::ShutdownKind) {
self.0.metrics.inc_shutdown_timeout(reason);
}

Expand Down
8 changes: 4 additions & 4 deletions rivetkit-rust/packages/rivetkit-core/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};

use crate::actor::connection::ConnHandle;
use crate::actor::lifecycle_hooks::Reply;
use crate::actor::task_types::StopReason;
use crate::actor::task_types::ShutdownKind;
use crate::error::ProtocolError;
use crate::types::ConnId;
use crate::websocket::WebSocket;
Expand Down Expand Up @@ -308,7 +308,7 @@ pub enum ActorEvent {
reply: Reply<Vec<StateDelta>>,
},
RunGracefulCleanup {
reason: StopReason,
reason: ShutdownKind,
reply: Reply<()>,
},
DisconnectConn {
Expand Down Expand Up @@ -349,8 +349,8 @@ impl ActorEvent {
SerializeStateReason::Inspector => "serialize_state_inspector",
},
Self::RunGracefulCleanup { reason, .. } => match reason {
StopReason::Sleep => "run_sleep_cleanup",
StopReason::Destroy => "run_destroy_cleanup",
ShutdownKind::Sleep => "run_sleep_cleanup",
ShutdownKind::Destroy => "run_destroy_cleanup",
},
Self::DisconnectConn { .. } => "disconnect_conn",
#[cfg(test)]
Expand Down
8 changes: 4 additions & 4 deletions rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use prometheus::{
Opts, Registry, TextEncoder,
};

use crate::actor::task_types::{StateMutationReason, StopReason, UserTaskKind};
use crate::actor::task_types::{ShutdownKind, StateMutationReason, UserTaskKind};

#[derive(Clone)]
pub(crate) struct ActorMetrics {
Expand Down Expand Up @@ -216,7 +216,7 @@ impl ActorMetrics {
for reason in StateMutationReason::ALL {
state_mutation_total.with_label_values(&[reason.as_metric_label()]);
}
for reason in [StopReason::Sleep, StopReason::Destroy] {
for reason in [ShutdownKind::Sleep, ShutdownKind::Destroy] {
shutdown_wait_seconds.with_label_values(&[reason.as_metric_label()]);
shutdown_timeout_total.with_label_values(&[reason.as_metric_label()]);
}
Expand Down Expand Up @@ -397,7 +397,7 @@ impl ActorMetrics {
.observe(duration.as_secs_f64());
}

pub(crate) fn observe_shutdown_wait(&self, reason: StopReason, duration: Duration) {
pub(crate) fn observe_shutdown_wait(&self, reason: ShutdownKind, duration: Duration) {
let Some(inner) = self.inner.as_ref().as_ref() else {
return;
};
Expand All @@ -407,7 +407,7 @@ impl ActorMetrics {
.observe(duration.as_secs_f64());
}

pub(crate) fn inc_shutdown_timeout(&self, reason: StopReason) {
pub(crate) fn inc_shutdown_timeout(&self, reason: ShutdownKind) {
let Some(inner) = self.inner.as_ref().as_ref() else {
return;
};
Expand Down
2 changes: 1 addition & 1 deletion rivetkit-rust/packages/rivetkit-core/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ pub use task::{
ActionDispatchResult, ActorTask, DispatchCommand, HttpDispatchResult, LifecycleCommand,
LifecycleEvent, LifecycleState,
};
pub use task_types::{ActorChildOutcome, StateMutationReason, StopReason, UserTaskKind};
pub use task_types::{ActorChildOutcome, ShutdownKind, StateMutationReason, UserTaskKind};
73 changes: 42 additions & 31 deletions rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use crate::actor::state::{
LAST_PUSHED_ALARM_KEY, PERSIST_DATA_KEY, PersistedActor, decode_last_pushed_alarm,
decode_persisted_actor,
};
use crate::actor::task_types::StopReason;
use crate::actor::task_types::ShutdownKind;
use crate::error::{ActorLifecycle as ActorLifecycleError, ActorRuntime};
use crate::types::{SaveStateOpts, format_actor_key};
use crate::websocket::WebSocket;
Expand Down Expand Up @@ -96,7 +96,7 @@ static SHUTDOWN_CLEANUP_HOOK: OnceLock<Mutex<Option<ShutdownCleanupHook>>> = Onc
pub(crate) struct ShutdownCleanupHookGuard;

#[cfg(test)]
type ShutdownReplyHook = Arc<dyn Fn(&ActorContext, StopReason) + Send + Sync>;
type ShutdownReplyHook = Arc<dyn Fn(&ActorContext, ShutdownKind) + Send + Sync>;

#[cfg(test)]
// Forced-sync: test hooks are installed and cleared from synchronous guard APIs.
Expand Down Expand Up @@ -149,7 +149,7 @@ impl Drop for ShutdownReplyHookGuard {
}

#[cfg(test)]
fn run_shutdown_reply_hook(ctx: &ActorContext, reason: StopReason) {
fn run_shutdown_reply_hook(ctx: &ActorContext, reason: ShutdownKind) {
let hook = SHUTDOWN_REPLY_HOOK
.get_or_init(|| Mutex::new(None))
.lock()
Expand All @@ -164,7 +164,7 @@ pub enum LifecycleCommand {
reply: oneshot::Sender<Result<()>>,
},
Stop {
reason: StopReason,
reason: ShutdownKind,
reply: oneshot::Sender<Result<()>>,
},
FireAlarm {
Expand All @@ -184,7 +184,8 @@ impl LifecycleCommand {
fn stop_reason(&self) -> Option<&'static str> {
match self {
Self::Stop { reason, .. } => Some(shutdown_reason_label(*reason)),
_ => None,
Self::Start { .. } => None,
Self::FireAlarm { .. } => None,
}
}
}
Expand Down Expand Up @@ -340,13 +341,13 @@ impl LifecycleEvent {
}

enum LiveExit {
Shutdown { reason: StopReason },
Shutdown { reason: ShutdownKind },
Terminated,
}

struct SleepGraceState {
deadline: Instant,
reason: StopReason,
reason: ShutdownKind,
}

struct PersistedStartup {
Expand Down Expand Up @@ -686,7 +687,7 @@ impl ActorTask {
}

#[cfg(test)]
async fn handle_stop(&mut self, reason: StopReason) -> Result<()> {
async fn handle_stop(&mut self, reason: ShutdownKind) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.register_shutdown_reply("stop", Some(shutdown_reason_label(reason)), reply_tx);
self.begin_grace(reason).await;
Expand Down Expand Up @@ -747,7 +748,7 @@ impl ActorTask {

async fn begin_stop(
&mut self,
reason: StopReason,
reason: ShutdownKind,
command: &'static str,
command_reason: Option<&'static str>,
reply: oneshot::Sender<Result<()>>,
Expand Down Expand Up @@ -811,7 +812,7 @@ impl ActorTask {
}
}

async fn begin_grace(&mut self, reason: StopReason) {
async fn begin_grace(&mut self, reason: ShutdownKind) {
tracing::debug!(
actor_id = %self.ctx.actor_id(),
reason = shutdown_reason_label(reason),
Expand All @@ -821,17 +822,18 @@ impl ActorTask {
self.ctx.cancel_local_alarm_timeouts();
self.ctx.set_local_alarm_callback(None);
self.transition_to(match reason {
StopReason::Sleep => LifecycleState::SleepGrace,
StopReason::Destroy => LifecycleState::DestroyGrace,
ShutdownKind::Sleep => LifecycleState::SleepGrace,
ShutdownKind::Destroy => LifecycleState::DestroyGrace,
});
self.start_grace(reason);
self.emit_grace_events(reason);
}

fn emit_grace_events(&mut self, reason: StopReason) {
fn emit_grace_events(&mut self, reason: ShutdownKind) {
let conns: Vec<_> = self.ctx.conns().collect();
for conn in conns {
let hibernatable_sleep = matches!(reason, StopReason::Sleep) && conn.is_hibernatable();
let hibernatable_sleep =
matches!(reason, ShutdownKind::Sleep) && conn.is_hibernatable();
if hibernatable_sleep {
self.ctx.request_hibernation_transport_save(conn.id());
continue;
Expand Down Expand Up @@ -1435,10 +1437,10 @@ impl ActorTask {
self.ctx.configure_actor_events(None);
}

fn start_grace(&mut self, reason: StopReason) {
fn start_grace(&mut self, reason: ShutdownKind) {
let grace_period = match reason {
StopReason::Sleep => self.factory.config().effective_sleep_grace_period(),
StopReason::Destroy => self.factory.config().effective_on_destroy_timeout(),
ShutdownKind::Sleep => self.factory.config().effective_sleep_grace_period(),
ShutdownKind::Destroy => self.factory.config().effective_on_destroy_timeout(),
};
self.sleep_deadline = None;
self.ctx.cancel_sleep_timer();
Expand Down Expand Up @@ -1466,7 +1468,13 @@ impl ActorTask {
None
}
LifecycleState::SleepGrace | LifecycleState::DestroyGrace => self.try_finish_grace(),
_ => None,
// Pre-startup, post-finalize, and tear-down states intentionally
// drop activity signals: there is no sleep deadline to reset and no
// grace window left to advance.
LifecycleState::Loading
| LifecycleState::SleepFinalize
| LifecycleState::Destroying
| LifecycleState::Terminated => None,
}
}

Expand Down Expand Up @@ -1532,7 +1540,7 @@ impl ActorTask {
#[cfg(test)]
async fn drain_tracked_work(
&mut self,
reason: StopReason,
reason: ShutdownKind,
phase: &'static str,
deadline: Instant,
) -> bool {
Expand All @@ -1542,7 +1550,7 @@ impl ActorTask {
#[cfg(test)]
async fn drain_tracked_work_with_ctx(
ctx: ActorContext,
reason: StopReason,
reason: ShutdownKind,
phase: &'static str,
deadline: Instant,
) -> bool {
Expand Down Expand Up @@ -1617,7 +1625,7 @@ impl ActorTask {
});
}

fn deliver_shutdown_reply(&mut self, reason: StopReason, result: &Result<()>) {
fn deliver_shutdown_reply(&mut self, reason: ShutdownKind, result: &Result<()>) {
#[cfg(test)]
run_shutdown_reply_hook(&self.ctx, reason);

Expand All @@ -1637,21 +1645,21 @@ impl ActorTask {
);
}

async fn run_shutdown(&mut self, reason: StopReason) -> Result<()> {
async fn run_shutdown(&mut self, reason: ShutdownKind) -> Result<()> {
self.sleep_grace = None;
let started_at = Instant::now();
self.state_save_deadline = None;
self.inspector_serialize_state_deadline = None;
self.sleep_deadline = None;
self.transition_to(match reason {
StopReason::Sleep => LifecycleState::SleepFinalize,
StopReason::Destroy => LifecycleState::Destroying,
ShutdownKind::Sleep => LifecycleState::SleepFinalize,
ShutdownKind::Destroy => LifecycleState::Destroying,
});
self.save_final_state().await?;
self.close_actor_event_channel();
self.join_aborted_run_handle().await;
Self::finish_shutdown_cleanup_with_ctx(self.ctx.clone(), reason).await?;
if matches!(reason, StopReason::Destroy) {
if matches!(reason, ShutdownKind::Destroy) {
self.ctx.mark_destroy_completed();
}
self.ctx.record_shutdown_wait(reason, started_at.elapsed());
Expand Down Expand Up @@ -1700,7 +1708,10 @@ impl ActorTask {
self.ctx.save_state(deltas).await
}

async fn finish_shutdown_cleanup_with_ctx(ctx: ActorContext, reason: StopReason) -> Result<()> {
async fn finish_shutdown_cleanup_with_ctx(
ctx: ActorContext,
reason: ShutdownKind,
) -> Result<()> {
let reason_label = shutdown_reason_label(reason);
let actor_id = ctx.actor_id().to_owned();
ctx.teardown_sleep_state().await;
Expand Down Expand Up @@ -1747,7 +1758,7 @@ impl ActorTask {
// Match the reference TS runtime: keep the persisted engine alarm armed
// across sleep so the next instance still has a wake trigger, but abort
// the local Tokio timer owned by the shutting-down instance.
StopReason::Sleep => {
ShutdownKind::Sleep => {
ctx.cancel_local_alarm_timeouts();
tracing::debug!(
actor_id = %actor_id,
Expand All @@ -1756,7 +1767,7 @@ impl ActorTask {
"actor shutdown cleanup step completed"
);
}
StopReason::Destroy => {
ShutdownKind::Destroy => {
ctx.cancel_driver_alarm_logged();
tracing::debug!(
actor_id = %actor_id,
Expand Down Expand Up @@ -2106,10 +2117,10 @@ impl ActorTask {
}
}

fn shutdown_reason_label(reason: StopReason) -> &'static str {
fn shutdown_reason_label(reason: ShutdownKind) -> &'static str {
match reason {
StopReason::Sleep => "sleep",
StopReason::Destroy => "destroy",
ShutdownKind::Sleep => "sleep",
ShutdownKind::Destroy => "destroy",
}
}

Expand Down
8 changes: 4 additions & 4 deletions rivetkit-rust/packages/rivetkit-core/src/actor/task_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ pub enum LifecycleState {
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StopReason {
pub enum ShutdownKind {
Sleep,
Destroy,
}

impl StopReason {
impl ShutdownKind {
pub(crate) fn as_metric_label(self) -> &'static str {
match self {
StopReason::Sleep => "sleep",
StopReason::Destroy => "destroy",
ShutdownKind::Sleep => "sleep",
ShutdownKind::Destroy => "destroy",
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion rivetkit-rust/packages/rivetkit-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use actor::task::{
ActionDispatchResult, ActorTask, DispatchCommand, HttpDispatchResult, LifecycleCommand,
LifecycleEvent, LifecycleState,
};
pub use actor::task_types::StopReason;
pub use actor::task_types::ShutdownKind;
pub use error::ActorLifecycle;
pub use inspector::{Inspector, InspectorSnapshot};
pub use registry::{CoreRegistry, ServeConfig};
Expand Down
Loading
Loading