Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ When the user asks to track something in a note, store it in `.agent/notes/` by
- Reserve `tokio::time::sleep` for: per-call timeouts via `tokio::select!`, retry/reconnect backoff, deliberate debounce windows, or `sleep_until(deadline)` arms in an event-select loop. If it is inside a `loop { check; sleep }` body, it is polling and should be event-driven instead.
- Never add unexplained wall-clock defers like `sleep(1ms)` to decouple a spawn from its caller. Use `tokio::task::yield_now().await` or rely on the spawn itself.

## Memory Leaks

- Never call `Box::leak` inside a per-request, per-error, or per-call code path. If the leak is for a `'static` reference required by an upstream API (e.g. `RivetErrorSchema`), intern the leaked value through a process-global `LazyLock<scc::HashMap<Key, &'static T>>` keyed on its identity so each unique value is leaked at most once. Examples: `BRIDGE_RIVET_ERROR_SCHEMAS` in `rivetkit-typescript/packages/rivetkit-napi/src/actor_factory.rs`.
- If every field in a leaked struct is a compile-time constant, use a `static`/`const` instead of `Box::leak(Box::new(...))`.
- `std::mem::forget` is only acceptable when an FFI handle cannot be dropped in the current context (e.g. napi `Ref::unref` requires an `Env`). Document the constraint inline and ensure the leak is bounded per actor/connection lifetime, not per call. Prefer routing the drop through an Env-bearing thread when possible.
- Spawned futures that capture JS callbacks or other heavy resources must have a guaranteed completion path (e.g. a `CancellationToken` whose clones are guaranteed to drop). A `spawn_local(async move { token.cancelled().await; ... })` only drains if every clone of the token is dropped or cancelled.

## Async Rust Locks

- Async Rust code defaults to `tokio::sync::Mutex` / `tokio::sync::RwLock`. Do not use `std::sync::Mutex` / `std::sync::RwLock`.
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions docs-internal/engine/napi-bridge.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ Rules for `rivetkit-typescript/packages/rivetkit-napi/`. The bridge is pure plum
- Receive-loop `SerializeState` handling stays inline in `napi_actor_events.rs`, reuses the shared `state_deltas_from_payload(...)` converter from `actor_context.rs`, and only cancels the adapter abort token on `Destroy` or final adapter teardown, not on `Sleep`.
- Receive-loop NAPI optional callbacks preserve the TypeScript runtime defaults: missing `onBeforeSubscribe` allows the subscription, missing workflow callbacks reply `None`, and missing connection lifecycle hooks still accept the connection while leaving the existing empty conn state untouched.

## Runtime-state reference cleanup

- `ActorContextShared::runtime_state` stores a N-API `Ref<()>` for the JS-only actor runtime state bag. `Ref::unref(env)` and reference deletion require an `Env`, but `reset_runtime_state()` runs from receive-loop worker paths and `Drop for ActorContextShared` may run without an active JS callback frame.
- The current `mem::forget` fallback in `actor_context.rs` keeps debug and release behavior aligned when no `Env` is available, but it leaks one JS object reference per actor wake cycle that created runtime state.
- The intended fix is to create an actor-shared cleanup `ThreadsafeFunction` the first time `runtime_state(env)` has an `Env`. Stale `Ref<()>` values should be wrapped in a payload whose `Drop` forgets the reference only if it was never successfully unreffed, then queued to that TSF from `reset_runtime_state()` and `Drop`.
- The TSF callback must run on the JS thread, call `ref.unref(ctx.env)`, and avoid invoking user callbacks. The TSF itself should be unreffed from the event loop so it does not keep Node alive.
- Shutdown is the hard edge: if the TSF is closing or Node supplies a null `Env` during addon teardown, the payload must fall back to the existing bounded process-lifetime leak instead of dropping a live `Ref<()>` and tripping napi-rs debug assertions.
- Before replacing the fallback, add a NAPI integration test that creates runtime state across many actor wake/destroy cycles, waits for the cleanup TSF to drain, and verifies native reference counts return to zero.

## Cancellation bridging

- For non-idempotent native waits like `queue.enqueueAndWait()`, bridge JS `AbortSignal` through a standalone native `CancellationToken`. Timeout-slicing is only safe for receive-style polling calls like `waitForNames()`.
Expand Down
5 changes: 5 additions & 0 deletions engine/artifacts/errors/wasm.invalid_config.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 86 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
## Module layout

- Actor subsystem implementations belong under `src/actor/`; keep root module aliases only for compatibility with existing public callers.
- Public HTTP status promotion for bridged runtime errors belongs in `src/error.rs::public_error_status_code`; NAPI and wasm adapters should call core instead of duplicating mappings.

## Sleep state invariants

- Any mutation that changes a `can_sleep` input must call `ActorContext::reset_sleep_timer()` so the `ActorTask` sleep deadline is re-evaluated. Inputs are: `ready`/`started`, `no_sleep`, `active_http_request_count`, `sleep_keep_awake_count`, `sleep_internal_keep_awake_count`, `pending_disconnect_count`, `conns()`, and `websocket_callback_count`. Missing this call leaves the sleep timer armed against stale state and triggers the `"sleep idle deadline elapsed but actor stayed awake"` warning on the next tick.
- `ActorContext::set_prevent_sleep(...)` / `prevent_sleep()` are deprecated no-ops kept for NAPI bridge compatibility. Use `keep_awake(future)` (holds counter while awaited) or `wait_until(future)` (tracked shutdown task) instead. Do not reintroduce a `prevent_sleep` field, a `CanSleep::PreventSleep` variant, or branches that read it.
- Runtime-owned promises that must drain during shutdown should use `ActorContext::register_task(...)`, not public `wait_until(...)`, so metrics and runtime intent stay distinct. Registered tasks must race user work against `shutdown_deadline_token()` so shutdown cannot hang forever.
- `ctx.sleep()` and `ctx.destroy()` return `Result<()>`. They error with `ActorLifecycleError::Starting` when called before startup completes and `ActorLifecycleError::Stopping` if the requested flag has already been set this generation (atomic `swap(true, ...)`). Internal idle-timer paths log and suppress the already-requested error.
- The grace deadline path (`on_sleep_grace_deadline`) aborts the user `run` handle and cancels `shutdown_deadline_token()`. Foreign-runtime adapters running `onSleep` / `onDestroy` must observe that token via `tokio::select!` so SQLite teardown does not race user cleanup work.
- Counter `register_zero_notify(&idle_notify)` hooks only drive shutdown drain waits. They are not a substitute for the activity-dirty notification, so any new sleep-affecting counter must also notify on transitions that change `can_sleep`.
Expand Down
36 changes: 36 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
actor_events: RwLock<Option<mpsc::UnboundedSender<ActorEvent>>>,
pub(super) lifecycle_events: RwLock<Option<mpsc::Sender<LifecycleEvent>>>,
hibernated_connection_liveness_override: RwLock<Option<BTreeSet<(Vec<u8>, Vec<u8>)>>>,
pub(super) lifecycle_event_inbox_capacity: usize,

Check warning on line 148 in rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

View workflow job for this annotation

GitHub Actions / Build rivetkit-wasm

field `lifecycle_event_inbox_capacity` is never read
pub(super) metrics: ActorMetrics,
diagnostics: ActorDiagnostics,
actor_id: String,
Expand Down Expand Up @@ -552,6 +552,14 @@
});
}

#[cfg(not(feature = "wasm-runtime"))]
pub fn register_task(&self, future: impl Future<Output = ()> + Send + 'static) {
let ctx = self.clone();
self.track_shutdown_task(async move {
Self::run_registered_task(ctx, future).await;
});
}

#[cfg(feature = "wasm-runtime")]
pub fn wait_until(&self, future: impl Future<Output = ()> + 'static) {
let counter = self.0.sleep.work.shutdown_counter.clone();
Expand All @@ -568,6 +576,34 @@
});
}

#[cfg(feature = "wasm-runtime")]
pub fn register_task(&self, future: impl Future<Output = ()> + 'static) {
let ctx = self.clone();
self.track_shutdown_task(async move {
Self::run_registered_task(ctx, future).await;
});
}

async fn run_registered_task<F>(ctx: ActorContext, future: F)
where
F: Future<Output = ()>,
{
let shutdown_deadline = ctx.shutdown_deadline_token();
ctx.record_user_task_started(UserTaskKind::RegisteredTask);
let started_at = Instant::now();
tokio::select! {
_ = future => {}
_ = shutdown_deadline.cancelled() => {
tracing::warn!(
actor_id = %ctx.actor_id(),
reason = "shutdown_deadline_elapsed",
"registered task cancelled by shutdown deadline"
);
}
}
ctx.record_user_task_finished(UserTaskKind::RegisteredTask, started_at.elapsed());
}

pub async fn keep_awake<F>(&self, future: F) -> F::Output
where
F: Future,
Expand Down Expand Up @@ -1238,7 +1274,7 @@
self.reset_sleep_timer();
}

pub(crate) fn sleep_config(&self) -> ActorConfig {

Check warning on line 1277 in rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

View workflow job for this annotation

GitHub Actions / Build rivetkit-wasm

method `sleep_config` is never used
self.sleep_state_config()
}

Expand Down
5 changes: 4 additions & 1 deletion rivetkit-rust/packages/rivetkit-core/src/actor/task_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ pub enum UserTaskKind {
ScheduledAction,
DisconnectCallback,
WaitUntil,
RegisteredTask,
SleepFinalize,
DestroyRequest,
}

impl UserTaskKind {
pub(crate) const ALL: [Self; 10] = [
pub(crate) const ALL: [Self; 11] = [
Self::Action,
Self::Http,
Self::WebSocketLifetime,
Expand All @@ -53,6 +54,7 @@ impl UserTaskKind {
Self::ScheduledAction,
Self::DisconnectCallback,
Self::WaitUntil,
Self::RegisteredTask,
Self::SleepFinalize,
Self::DestroyRequest,
];
Expand All @@ -67,6 +69,7 @@ impl UserTaskKind {
Self::ScheduledAction => "scheduled_action",
Self::DisconnectCallback => "disconnect_callback",
Self::WaitUntil => "wait_until",
Self::RegisteredTask => "registered_task",
Self::SleepFinalize => "sleep_finalize",
Self::DestroyRequest => "destroy_request",
}
Expand Down
23 changes: 23 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,29 @@
use rivet_error::*;
use serde::{Deserialize, Serialize};

pub fn public_error_status_code(group: &str, code: &str) -> Option<u16> {
match (group, code) {
("auth", "forbidden") => Some(403),
("actor", "action_not_found") => Some(404),
("actor", "action_timed_out") => Some(408),
("actor", "aborted") => Some(400),
("message", "incoming_too_long" | "outgoing_too_long") => Some(400),
(
"queue",
"full"
| "message_too_large"
| "message_invalid"
| "invalid_payload"
| "invalid_completion_payload"
| "already_completed"
| "previous_message_not_completed"
| "complete_not_configured"
| "timed_out",
) => Some(400),
_ => None,
}
}

#[derive(RivetError, Debug, Clone, Deserialize, Serialize)]
#[error("actor")]
pub enum ActorLifecycle {
Expand Down
Loading
Loading