Skip to content

Commit 8bac2e2

Browse files
committed
fix(rivetkit-wasm): fix mem leaks
1 parent 4bb9291 commit 8bac2e2

30 files changed

Lines changed: 2312 additions & 832 deletions

File tree

CLAUDE.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,13 @@ When the user asks to track something in a note, store it in `.agent/notes/` by
215215
- Reserve `tokio::time::sleep` for: per-call timeouts via `tokio::select!`, retry/reconnect backoff, deliberate debounce windows, or `sleep_until(deadline)` arms in an event-select loop. If it is inside a `loop { check; sleep }` body, it is polling and should be event-driven instead.
216216
- Never add unexplained wall-clock defers like `sleep(1ms)` to decouple a spawn from its caller. Use `tokio::task::yield_now().await` or rely on the spawn itself.
217217

218+
## Memory Leaks
219+
220+
- Never call `Box::leak` inside a per-request, per-error, or per-call code path. If the leak is for a `'static` reference required by an upstream API (e.g. `RivetErrorSchema`), intern the leaked value through a process-global `LazyLock<scc::HashMap<Key, &'static T>>` keyed on its identity so each unique value is leaked at most once. Examples: `BRIDGE_RIVET_ERROR_SCHEMAS` in `rivetkit-typescript/packages/rivetkit-napi/src/actor_factory.rs`.
221+
- If every field in a leaked struct is a compile-time constant, use a `static`/`const` instead of `Box::leak(Box::new(...))`.
222+
- `std::mem::forget` is only acceptable when an FFI handle cannot be dropped in the current context (e.g. napi `Ref::unref` requires an `Env`). Document the constraint inline and ensure the leak is bounded per actor/connection lifetime, not per call. Prefer routing the drop through an Env-bearing thread when possible.
223+
- Spawned futures that capture JS callbacks or other heavy resources must have a guaranteed completion path (e.g. a `CancellationToken` whose clones are guaranteed to drop). A `spawn_local(async move { token.cancelled().await; ... })` only drains if every clone of the token is dropped or cancelled.
224+
218225
## Async Rust Locks
219226

220227
- Async Rust code defaults to `tokio::sync::Mutex` / `tokio::sync::RwLock`. Do not use `std::sync::Mutex` / `std::sync::RwLock`.

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs-internal/engine/napi-bridge.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ Rules for `rivetkit-typescript/packages/rivetkit-napi/`. The bridge is pure plum
3434
- Receive-loop `SerializeState` handling stays inline in `napi_actor_events.rs`, reuses the shared `state_deltas_from_payload(...)` converter from `actor_context.rs`, and only cancels the adapter abort token on `Destroy` or final adapter teardown, not on `Sleep`.
3535
- Receive-loop NAPI optional callbacks preserve the TypeScript runtime defaults: missing `onBeforeSubscribe` allows the subscription, missing workflow callbacks reply `None`, and missing connection lifecycle hooks still accept the connection while leaving the existing empty conn state untouched.
3636

37+
## Runtime-state reference cleanup
38+
39+
- `ActorContextShared::runtime_state` stores a N-API `Ref<()>` for the JS-only actor runtime state bag. `Ref::unref(env)` and reference deletion require an `Env`, but `reset_runtime_state()` runs from receive-loop worker paths and `Drop for ActorContextShared` may run without an active JS callback frame.
40+
- The current `mem::forget` fallback in `actor_context.rs` keeps debug and release behavior aligned when no `Env` is available, but it leaks one JS object reference per actor wake cycle that created runtime state.
41+
- The intended fix is to create an actor-shared cleanup `ThreadsafeFunction` the first time `runtime_state(env)` has an `Env`. Stale `Ref<()>` values should be wrapped in a payload whose `Drop` forgets the reference only if it was never successfully unreffed, then queued to that TSF from `reset_runtime_state()` and `Drop`.
42+
- The TSF callback must run on the JS thread, call `ref.unref(ctx.env)`, and avoid invoking user callbacks. The TSF itself should be unreffed from the event loop so it does not keep Node alive.
43+
- Shutdown is the hard edge: if the TSF is closing or Node supplies a null `Env` during addon teardown, the payload must fall back to the existing bounded process-lifetime leak instead of dropping a live `Ref<()>` and tripping napi-rs debug assertions.
44+
- Before replacing the fallback, add a NAPI integration test that creates runtime state across many actor wake/destroy cycles, waits for the cleanup TSF to drain, and verifies native reference counts return to zero.
45+
3746
## Cancellation bridging
3847

3948
- For non-idempotent native waits like `queue.enqueueAndWait()`, bridge JS `AbortSignal` through a standalone native `CancellationToken`. Timeout-slicing is only safe for receive-style polling calls like `waitForNames()`.

pnpm-lock.yaml

Lines changed: 86 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rivetkit-rust/packages/rivetkit-core/CLAUDE.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
## Module layout
44

55
- Actor subsystem implementations belong under `src/actor/`; keep root module aliases only for compatibility with existing public callers.
6+
- Public HTTP status promotion for bridged runtime errors belongs in `src/error.rs::public_error_status_code`; NAPI and wasm adapters should call core instead of duplicating mappings.
67

78
## Sleep state invariants
89

910
- Any mutation that changes a `can_sleep` input must call `ActorContext::reset_sleep_timer()` so the `ActorTask` sleep deadline is re-evaluated. Inputs are: `ready`/`started`, `no_sleep`, `active_http_request_count`, `sleep_keep_awake_count`, `sleep_internal_keep_awake_count`, `pending_disconnect_count`, `conns()`, and `websocket_callback_count`. Missing this call leaves the sleep timer armed against stale state and triggers the `"sleep idle deadline elapsed but actor stayed awake"` warning on the next tick.
1011
- `ActorContext::set_prevent_sleep(...)` / `prevent_sleep()` are deprecated no-ops kept for NAPI bridge compatibility. Use `keep_awake(future)` (holds counter while awaited) or `wait_until(future)` (tracked shutdown task) instead. Do not reintroduce a `prevent_sleep` field, a `CanSleep::PreventSleep` variant, or branches that read it.
12+
- Runtime-owned promises that must drain during shutdown should use `ActorContext::register_task(...)`, not public `wait_until(...)`, so metrics and runtime intent stay distinct. Registered tasks must race user work against `shutdown_deadline_token()` so shutdown cannot hang forever.
1113
- `ctx.sleep()` and `ctx.destroy()` return `Result<()>`. They error with `ActorLifecycleError::Starting` when called before startup completes and `ActorLifecycleError::Stopping` if the requested flag has already been set this generation (atomic `swap(true, ...)`). Internal idle-timer paths log and suppress the already-requested error.
1214
- The grace deadline path (`on_sleep_grace_deadline`) aborts the user `run` handle and cancels `shutdown_deadline_token()`. Foreign-runtime adapters running `onSleep` / `onDestroy` must observe that token via `tokio::select!` so SQLite teardown does not race user cleanup work.
1315
- Counter `register_zero_notify(&idle_notify)` hooks only drive shutdown drain waits. They are not a substitute for the activity-dirty notification, so any new sleep-affecting counter must also notify on transitions that change `can_sleep`.

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,14 @@ impl ActorContext {
552552
});
553553
}
554554

555+
#[cfg(not(feature = "wasm-runtime"))]
556+
pub fn register_task(&self, future: impl Future<Output = ()> + Send + 'static) {
557+
let ctx = self.clone();
558+
self.track_shutdown_task(async move {
559+
Self::run_registered_task(ctx, future).await;
560+
});
561+
}
562+
555563
#[cfg(feature = "wasm-runtime")]
556564
pub fn wait_until(&self, future: impl Future<Output = ()> + 'static) {
557565
let counter = self.0.sleep.work.shutdown_counter.clone();
@@ -568,6 +576,34 @@ impl ActorContext {
568576
});
569577
}
570578

579+
#[cfg(feature = "wasm-runtime")]
580+
pub fn register_task(&self, future: impl Future<Output = ()> + 'static) {
581+
let ctx = self.clone();
582+
self.track_shutdown_task(async move {
583+
Self::run_registered_task(ctx, future).await;
584+
});
585+
}
586+
587+
async fn run_registered_task<F>(ctx: ActorContext, future: F)
588+
where
589+
F: Future<Output = ()>,
590+
{
591+
let shutdown_deadline = ctx.shutdown_deadline_token();
592+
ctx.record_user_task_started(UserTaskKind::RegisteredTask);
593+
let started_at = Instant::now();
594+
tokio::select! {
595+
_ = future => {}
596+
_ = shutdown_deadline.cancelled() => {
597+
tracing::warn!(
598+
actor_id = %ctx.actor_id(),
599+
reason = "shutdown_deadline_elapsed",
600+
"registered task cancelled by shutdown deadline"
601+
);
602+
}
603+
}
604+
ctx.record_user_task_finished(UserTaskKind::RegisteredTask, started_at.elapsed());
605+
}
606+
571607
pub async fn keep_awake<F>(&self, future: F) -> F::Output
572608
where
573609
F: Future,

rivetkit-rust/packages/rivetkit-core/src/actor/task_types.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,13 @@ pub enum UserTaskKind {
3939
ScheduledAction,
4040
DisconnectCallback,
4141
WaitUntil,
42+
RegisteredTask,
4243
SleepFinalize,
4344
DestroyRequest,
4445
}
4546

4647
impl UserTaskKind {
47-
pub(crate) const ALL: [Self; 10] = [
48+
pub(crate) const ALL: [Self; 11] = [
4849
Self::Action,
4950
Self::Http,
5051
Self::WebSocketLifetime,
@@ -53,6 +54,7 @@ impl UserTaskKind {
5354
Self::ScheduledAction,
5455
Self::DisconnectCallback,
5556
Self::WaitUntil,
57+
Self::RegisteredTask,
5658
Self::SleepFinalize,
5759
Self::DestroyRequest,
5860
];
@@ -67,6 +69,7 @@ impl UserTaskKind {
6769
Self::ScheduledAction => "scheduled_action",
6870
Self::DisconnectCallback => "disconnect_callback",
6971
Self::WaitUntil => "wait_until",
72+
Self::RegisteredTask => "registered_task",
7073
Self::SleepFinalize => "sleep_finalize",
7174
Self::DestroyRequest => "destroy_request",
7275
}

rivetkit-rust/packages/rivetkit-core/src/error.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,29 @@
11
use rivet_error::*;
22
use serde::{Deserialize, Serialize};
33

4+
pub fn public_error_status_code(group: &str, code: &str) -> Option<u16> {
5+
match (group, code) {
6+
("auth", "forbidden") => Some(403),
7+
("actor", "action_not_found") => Some(404),
8+
("actor", "action_timed_out") => Some(408),
9+
("actor", "aborted") => Some(400),
10+
("message", "incoming_too_long" | "outgoing_too_long") => Some(400),
11+
(
12+
"queue",
13+
"full"
14+
| "message_too_large"
15+
| "message_invalid"
16+
| "invalid_payload"
17+
| "invalid_completion_payload"
18+
| "already_completed"
19+
| "previous_message_not_completed"
20+
| "complete_not_configured"
21+
| "timed_out",
22+
) => Some(400),
23+
_ => None,
24+
}
25+
}
26+
427
#[derive(RivetError, Debug, Clone, Deserialize, Serialize)]
528
#[error("actor")]
629
pub enum ActorLifecycle {

0 commit comments

Comments
 (0)