Skip to content

Commit 1dfe1ec

Browse files
committed
fix(rivetkit-napi): plumb ctx through serializeState TSF and stop panicking on runtime_state Ref drop
1 parent 8ec77ea commit 1dfe1ec

5 files changed

Lines changed: 37 additions & 10 deletions

File tree

rivetkit-typescript/packages/rivetkit-napi/index.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ export interface JsActionDefinition {
5050
export interface JsActorConfig {
5151
name?: string
5252
icon?: string
53+
hasDatabase?: boolean
5354
canHibernateWebsocket?: boolean
5455
stateSaveIntervalMs?: number
5556
createStateTimeoutMs?: number

rivetkit-typescript/packages/rivetkit-napi/src/actor_context.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,14 +766,30 @@ impl ActorContextShared {
766766
*self.abort_token.lock() = None;
767767
*self.run_restart.lock() = None;
768768
*self.task_sender.lock() = None;
769-
*self.runtime_state.lock() = None;
769+
// napi Ref::unref requires an Env; this function runs on tokio workers
770+
// with no Env available. Dropping without unref panics a debug_assert in
771+
// napi-rs and silently leaks the napi reference slot in release. Forget
772+
// instead so debug matches release behavior. Leak is bounded to one
773+
// JsObject per actor wake cycle until the process exits.
774+
if let Some(old) = self.runtime_state.lock().take() {
775+
std::mem::forget(old);
776+
}
770777
*self.end_reason.lock() = None;
771778
*self.websocket_callback_regions.lock() = BTreeMap::new();
772779
self.next_websocket_callback_region_id
773780
.store(0, Ordering::SeqCst);
774781
}
775782
}
776783

784+
impl Drop for ActorContextShared {
785+
fn drop(&mut self) {
786+
// Same Env-less drop problem as reset_runtime_state. See comment there.
787+
if let Some(old) = self.runtime_state.lock().take() {
788+
std::mem::forget(old);
789+
}
790+
}
791+
}
792+
777793
fn actor_context_shared(actor_id: &str) -> Arc<ActorContextShared> {
778794
ACTOR_CONTEXT_SHARED.retain_sync(|_, shared| shared.strong_count() > 0);
779795

rivetkit-typescript/packages/rivetkit-napi/src/actor_factory.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub struct JsActionDefinition {
6565
pub struct JsActorConfig {
6666
pub name: Option<String>,
6767
pub icon: Option<String>,
68+
pub has_database: Option<bool>,
6869
pub can_hibernate_websocket: Option<bool>,
6970
pub state_save_interval_ms: Option<u32>,
7071
pub create_state_timeout_ms: Option<u32>,
@@ -195,6 +196,7 @@ pub(crate) struct WorkflowReplayPayload {
195196

196197
#[derive(Clone)]
197198
pub(crate) struct SerializeStatePayload {
199+
pub(crate) ctx: CoreActorContext,
198200
pub(crate) reason: String,
199201
}
200202

@@ -738,7 +740,7 @@ impl TsfnPayloadSummary for WorkflowReplayPayload {
738740

739741
impl TsfnPayloadSummary for SerializeStatePayload {
740742
fn payload_summary(&self) -> String {
741-
format!("reason={}", self.reason)
743+
format!("actor_id={} reason={}", self.ctx.actor_id(), self.reason)
742744
}
743745
}
744746

@@ -882,7 +884,6 @@ fn build_action_payload(env: &Env, payload: ActionPayload) -> napi::Result<Vec<n
882884
Some(conn) => object.set("conn", ConnHandle::new(conn))?,
883885
None => object.set("conn", env.get_null()?)?,
884886
}
885-
object.set("name", payload.name)?;
886887
object.set("args", Buffer::from(payload.args))?;
887888
match payload.cancel_token {
888889
Some(cancel_token) => object.set("cancelToken", CancellationToken::new(cancel_token))?,
@@ -926,9 +927,10 @@ fn build_serialize_state_payload(
926927
env: &Env,
927928
payload: SerializeStatePayload,
928929
) -> napi::Result<Vec<napi::JsUnknown>> {
929-
Ok(vec![
930-
env.create_string_from_std(payload.reason)?.into_unknown(),
931-
])
930+
let mut object = env.create_object()?;
931+
object.set("ctx", ActorContext::new(payload.ctx))?;
932+
object.set("reason", env.create_string_from_std(payload.reason)?)?;
933+
Ok(vec![object.into_unknown()])
932934
}
933935

934936
fn build_request_object(env: &Env, request: Request) -> napi::Result<JsObject> {
@@ -1033,6 +1035,7 @@ impl From<JsActorConfig> for ActorConfigInput {
10331035
Self {
10341036
name: value.name,
10351037
icon: value.icon,
1038+
has_database: value.has_database,
10361039
can_hibernate_websocket: value.can_hibernate_websocket,
10371040
state_save_interval_ms: value.state_save_interval_ms,
10381041
create_vars_timeout_ms: value.create_vars_timeout_ms,

rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ pub(crate) async fn dispatch_event(
541541
});
542542
}
543543
ActorEvent::SerializeState { reason, reply } => {
544-
reply.send(maybe_serialize(bindings.as_ref(), dirty.as_ref(), reason).await);
544+
reply.send(maybe_serialize(bindings.as_ref(), dirty.as_ref(), ctx, reason).await);
545545
}
546546
ActorEvent::RunGracefulCleanup { reason, reply } => {
547547
let callback = match reason {
@@ -617,12 +617,13 @@ pub(crate) async fn dispatch_event(
617617
async fn maybe_serialize(
618618
bindings: &CallbackBindings,
619619
dirty: &AtomicBool,
620+
ctx: &ActorContext,
620621
reason: SerializeStateReason,
621622
) -> Result<Vec<StateDelta>> {
622623
// The adapter dirty bit is consumed only by persistence-bound serialization.
623624
// Inspector snapshots feed the live overlay and must not steal a pending save.
624625
maybe_serialize_with(bindings, dirty, reason, |bindings, reason| async move {
625-
call_serialize_state(bindings, reason).await
626+
call_serialize_state(bindings, ctx, reason).await
626627
})
627628
.await
628629
}
@@ -646,6 +647,7 @@ where
646647

647648
async fn call_serialize_state(
648649
bindings: &CallbackBindings,
650+
ctx: &ActorContext,
649651
reason: &'static str,
650652
) -> Result<Vec<StateDelta>> {
651653
let callback = bindings
@@ -656,6 +658,7 @@ async fn call_serialize_state(
656658
"serializeState",
657659
callback,
658660
SerializeStatePayload {
661+
ctx: ctx.inner().clone(),
659662
reason: reason.to_owned(),
660663
},
661664
)
@@ -1713,8 +1716,11 @@ mod tests {
17131716
async fn maybe_serialize_skips_save_when_adapter_is_clean() {
17141717
let bindings = empty_bindings();
17151718
let dirty = AtomicBool::new(false);
1719+
let core_ctx =
1720+
rivetkit_core::ActorContext::new("maybe-serialize-clean", "actor", Vec::new(), "local");
1721+
let ctx = ActorContext::new(core_ctx);
17161722

1717-
let deltas = maybe_serialize(&bindings, &dirty, SerializeStateReason::Save)
1723+
let deltas = maybe_serialize(&bindings, &dirty, &ctx, SerializeStateReason::Save)
17181724
.await
17191725
.expect("clean save serialize should not fail");
17201726

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ type NativeWebSocketWithEvents = NativeWebSocket & {
118118
};
119119
const textEncoder = new TextEncoder();
120120
const textDecoder = new TextDecoder();
121-
type SerializeStateReason = "save" | "inspector" | "sleep" | "destroy";
121+
type SerializeStateReason = "save" | "inspector";
122122
type NativeOnStateChangeHandler = (
123123
ctx: NativeActorContextAdapter,
124124
state: unknown,
@@ -3046,6 +3046,7 @@ function buildActorConfig(
30463046
return {
30473047
name: options.name as string | undefined,
30483048
icon: options.icon as string | undefined,
3049+
hasDatabase: config.db !== undefined,
30493050
canHibernateWebsocket:
30503051
typeof canHibernate === "boolean" ? canHibernate : undefined,
30513052
stateSaveIntervalMs: options.stateSaveInterval as number | undefined,

0 commit comments

Comments
 (0)