From d45b410eefc9a63c262166283e2e4ad962071862 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Fri, 24 Apr 2026 01:28:24 -0700 Subject: [PATCH] fix(rivetkit-core): gate startup until runtime is ready --- pnpm-lock.yaml | 8 +++++++ .../artifacts/errors/actor.starting.json | 5 ++++ .../packages/rivetkit-core/src/error.rs | 3 +++ .../rivetkit-core/src/registry/http.rs | 22 +++++++++++++---- .../rivetkit-core/src/registry/mod.rs | 24 ++++++++++++++++--- .../rivetkit/src/client/actor-conn.ts | 3 ++- .../rivetkit/src/client/actor-handle.ts | 18 ++++++++++---- .../rivetkit/src/client/actor-query.ts | 1 + 8 files changed, 72 insertions(+), 12 deletions(-) create mode 100644 rivetkit-rust/engine/artifacts/errors/actor.starting.json diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index fa1f9073da..24f6def67b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1659,6 +1659,9 @@ importers: '@rivetkit/rivetkit-napi': specifier: workspace:* version: link:../../rivetkit-typescript/packages/rivetkit-napi + '@rivetkit/sql-loader': + specifier: '*' + version: 2.2.1 ai: specifier: ^4.0.38 version: 4.3.19(react@19.1.0)(zod@3.25.76) @@ -8896,6 +8899,9 @@ packages: resolution: {integrity: sha512-3qndQUQXLdwafMEqfhz24hUtDPcsf1Bu3q52Kb8MqeH8JUh3h6R4HYW3ZJXiQsLcyYyFM68PuIwlLRlg1xDEpg==} engines: {node: ^14.18.0 || >=16.0.0} + '@rivetkit/sql-loader@2.2.1': + resolution: {integrity: sha512-qqykzDGak06VY58o8qh8TH7CL77vIp2FHUJB2ZT78LJ+muRrXiwj/QHeVn9ykWePqCYCD9XP0L6Qd2fmERdXqQ==} + '@rolldown/pluginutils@1.0.0-beta.27': resolution: {integrity: sha512-+d0F4MKMCbeVUJwG96uQ4SgAznZNSq93I3V+9NHA4OpvqG8mRCpGdKmK8l/dl02h2CCDHwW2FqilnTyDcAnqjA==} @@ -23362,6 +23368,8 @@ snapshots: '@rivetkit/bare-ts@0.6.2': {} + '@rivetkit/sql-loader@2.2.1': {} + '@rolldown/pluginutils@1.0.0-beta.27': {} '@rollup/pluginutils@5.3.0(rollup@4.57.1)': diff --git a/rivetkit-rust/engine/artifacts/errors/actor.starting.json b/rivetkit-rust/engine/artifacts/errors/actor.starting.json new file mode 100644 index 0000000000..b9e413c16f --- /dev/null +++ b/rivetkit-rust/engine/artifacts/errors/actor.starting.json @@ -0,0 +1,5 @@ +{ + "code": "starting", + "group": "actor", + "message": "Actor is starting." +} \ No newline at end of file diff --git a/rivetkit-rust/packages/rivetkit-core/src/error.rs b/rivetkit-rust/packages/rivetkit-core/src/error.rs index 1bff222109..c94c3a5f2b 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/error.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/error.rs @@ -4,6 +4,9 @@ use serde::{Deserialize, Serialize}; #[derive(RivetError, Debug, Clone, Deserialize, Serialize)] #[error("actor")] pub enum ActorLifecycle { + #[error("starting", "Actor is starting.")] + Starting, + #[error("not_ready", "Actor is not ready.")] NotReady, diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs index c81e0fa666..2ca2ebe6c8 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs @@ -10,11 +10,26 @@ impl RegistryDispatcher { actor_id: &str, request: HttpRequest, ) -> Result { - let instance = self.active_actor(actor_id).await?; if request.path == "/metrics" { + let instance = self.active_actor(actor_id).await?; return self.handle_metrics_fetch(&instance, &request); } + let request = build_http_request(request).await?; + let framework_route = framework_http_route(request.uri().path())?; + let instance = match self.active_actor(actor_id).await { + Ok(instance) => instance, + Err(error) => { + if framework_route.is_some() { + return message_boundary_error_response( + request_encoding(request.headers()), + framework_anyhow_status(&error), + error, + ); + } + return Ok(inspector_anyhow_response(error)); + } + }; if let Some(response) = self.handle_inspector_fetch(&instance, &request).await? { return Ok(response); } @@ -29,7 +44,7 @@ impl RegistryDispatcher { }); }; - if let Some(route) = framework_http_route(request.uri().path())? { + if let Some(route) = framework_route { let response = self.handle_framework_fetch(&instance, request, route).await; rearm_sleep_after_request(instance.ctx.clone()); return response; @@ -899,6 +914,7 @@ mod tests { use http::StatusCode; use rivet_error::RivetError; use serde_json::json; + use vbare::OwnedVersionedData; #[derive(RivetError)] #[error("message", "incoming_too_long", "Incoming message too long")] @@ -1074,8 +1090,6 @@ mod tests { #[test] fn message_boundary_error_response_serializes_bare_v3() { - use vbare::OwnedVersionedData; - let response = message_boundary_error_response( HttpResponseEncoding::Bare, StatusCode::BAD_REQUEST, diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs index ace1536eaa..8034d5abc7 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs @@ -48,7 +48,7 @@ use crate::actor::task::{ }; use crate::actor::task_types::StopReason; use crate::engine_process::EngineProcessManager; -use crate::error::ActorRuntime; +use crate::error::{ActorLifecycle as ActorLifecycleError, ActorRuntime}; use crate::inspector::protocol::{ self as inspector_protocol, ServerMessage as InspectorServerMessage, }; @@ -725,13 +725,31 @@ impl RegistryDispatcher { async fn active_actor(&self, actor_id: &str) -> Result> { if let Some(instance) = self.actor_instances.get_async(&actor_id.to_owned()).await { match instance.get() { - ActorInstanceState::Active(instance) => return Ok(instance.clone()), + ActorInstanceState::Active(instance) => { + let instance = instance.clone(); + if instance.ctx.ready() { + return Ok(instance); + } + + instance + .ctx + .warn_work_sent_to_stopping_instance("active_actor"); + return Err(if instance.ctx.destroy_requested() { + ActorLifecycleError::Destroying.build() + } else { + ActorLifecycleError::Starting.build() + }); + } ActorInstanceState::Stopping(instance) => { let instance = instance.clone(); instance .ctx .warn_work_sent_to_stopping_instance("active_actor"); - return Ok(instance); + return Err(if instance.ctx.destroy_requested() { + ActorLifecycleError::Destroying.build() + } else { + ActorLifecycleError::Stopping.build() + }); } } } diff --git a/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts b/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts index 93488b3978..2498a40607 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts @@ -231,7 +231,8 @@ export class ActorConnRaw { /** * If the query is dynamic (getForKey or getOrCreateForKey) and the error - * indicates the previously resolved actor is gone (not_found or destroyed), + * indicates the previously resolved actor is stale (not_found, starting, + * stopping, or destroyed), * clear the cached actor ID and connection ID so the next operation * re-resolves to a fresh actor. Returns true if the identity was * invalidated. diff --git a/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts b/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts index 9f21a3d4a5..4b4282db29 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts @@ -175,7 +175,9 @@ export class ActorHandleRaw { const invalidated = this.#invalidateResolvedActorId(group, code); if (invalidated && attempt < maxAttempts - 1) { useQueryTarget = - code === "stopping" || code.startsWith("destroyed_"); + (code === "starting" || + code === "stopping" || + code.startsWith("destroyed_")); if (useQueryTarget) { await this.#waitForRetryWindow(); } @@ -344,7 +346,10 @@ export class ActorHandleRaw { const invalidated = this.#invalidateResolvedActorId(group, code); if (invalidated && attempt < maxAttempts - 1) { - if (group === "actor" && code === "stopping") { + if ( + group === "actor" && + (code === "starting" || code === "stopping") + ) { useQueryTarget = true; await new Promise((resolve) => setTimeout(resolve, 100)); } @@ -398,6 +403,7 @@ export class ActorHandleRaw { return ( code === "not_found" || + code === "starting" || code === "stopping" || code === "destroying" || code.startsWith("destroyed_") @@ -592,7 +598,9 @@ export class ActorHandleRaw { const invalidated = this.#invalidateResolvedActorId(group, code); if (invalidated && attempt < maxAttempts - 1) { useQueryTarget = - code === "stopping" || code.startsWith("destroyed_"); + (code === "starting" || + code === "stopping" || + code.startsWith("destroyed_")); if (useQueryTarget) { await this.#waitForRetryWindow(); } @@ -662,7 +670,9 @@ export class ActorHandleRaw { const invalidated = this.#invalidateResolvedActorId(group, code); if (invalidated && attempt < maxAttempts - 1) { const useQueryTarget = - code === "stopping" || code.startsWith("destroyed_"); + code === "starting" || + code === "stopping" || + code.startsWith("destroyed_"); return { useQueryTarget, waitForRetryWindow: useQueryTarget, diff --git a/rivetkit-typescript/packages/rivetkit/src/client/actor-query.ts b/rivetkit-typescript/packages/rivetkit/src/client/actor-query.ts index 20a40da07f..d1b336c915 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/actor-query.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/actor-query.ts @@ -50,6 +50,7 @@ export function isStaleResolvedActorError( return ( group === "actor" && (code === "not_found" || + code === "starting" || code === "stopping" || code.startsWith("destroyed_")) );