Skip to content

Commit e33e626

Browse files
committed
fix(rivetkit-core): gate startup until runtime is ready
1 parent e8072b7 commit e33e626

7 files changed

Lines changed: 64 additions & 12 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"code": "starting",
3+
"group": "actor",
4+
"message": "Actor is starting."
5+
}

rivetkit-rust/packages/rivetkit-core/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ use serde::{Deserialize, Serialize};
44
#[derive(RivetError, Debug, Clone, Deserialize, Serialize)]
55
#[error("actor")]
66
pub enum ActorLifecycle {
7+
#[error("starting", "Actor is starting.")]
8+
Starting,
9+
710
#[error("not_ready", "Actor is not ready.")]
811
NotReady,
912

rivetkit-rust/packages/rivetkit-core/src/registry/http.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,26 @@ impl RegistryDispatcher {
1010
actor_id: &str,
1111
request: HttpRequest,
1212
) -> Result<HttpResponse> {
13-
let instance = self.active_actor(actor_id).await?;
1413
if request.path == "/metrics" {
14+
let instance = self.active_actor(actor_id).await?;
1515
return self.handle_metrics_fetch(&instance, &request);
1616
}
17+
1718
let request = build_http_request(request).await?;
19+
let framework_route = framework_http_route(request.uri().path())?;
20+
let instance = match self.active_actor(actor_id).await {
21+
Ok(instance) => instance,
22+
Err(error) => {
23+
if framework_route.is_some() {
24+
return message_boundary_error_response(
25+
request_encoding(request.headers()),
26+
framework_anyhow_status(&error),
27+
error,
28+
);
29+
}
30+
return Ok(inspector_anyhow_response(error));
31+
}
32+
};
1833
if let Some(response) = self.handle_inspector_fetch(&instance, &request).await? {
1934
return Ok(response);
2035
}
@@ -29,7 +44,7 @@ impl RegistryDispatcher {
2944
});
3045
};
3146

32-
if let Some(route) = framework_http_route(request.uri().path())? {
47+
if let Some(route) = framework_route {
3348
let response = self.handle_framework_fetch(&instance, request, route).await;
3449
rearm_sleep_after_request(instance.ctx.clone());
3550
return response;
@@ -899,6 +914,7 @@ mod tests {
899914
use http::StatusCode;
900915
use rivet_error::RivetError;
901916
use serde_json::json;
917+
use vbare::OwnedVersionedData;
902918

903919
#[derive(RivetError)]
904920
#[error("message", "incoming_too_long", "Incoming message too long")]
@@ -1074,8 +1090,6 @@ mod tests {
10741090

10751091
#[test]
10761092
fn message_boundary_error_response_serializes_bare_v3() {
1077-
use vbare::OwnedVersionedData;
1078-
10791093
let response = message_boundary_error_response(
10801094
HttpResponseEncoding::Bare,
10811095
StatusCode::BAD_REQUEST,

rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::actor::task::{
4848
};
4949
use crate::actor::task_types::StopReason;
5050
use crate::engine_process::EngineProcessManager;
51-
use crate::error::ActorRuntime;
51+
use crate::error::{ActorLifecycle as ActorLifecycleError, ActorRuntime};
5252
use crate::inspector::protocol::{
5353
self as inspector_protocol, ServerMessage as InspectorServerMessage,
5454
};
@@ -725,13 +725,31 @@ impl RegistryDispatcher {
725725
async fn active_actor(&self, actor_id: &str) -> Result<Arc<ActorTaskHandle>> {
726726
if let Some(instance) = self.actor_instances.get_async(&actor_id.to_owned()).await {
727727
match instance.get() {
728-
ActorInstanceState::Active(instance) => return Ok(instance.clone()),
728+
ActorInstanceState::Active(instance) => {
729+
let instance = instance.clone();
730+
if instance.ctx.ready() {
731+
return Ok(instance);
732+
}
733+
734+
instance
735+
.ctx
736+
.warn_work_sent_to_stopping_instance("active_actor");
737+
return Err(if instance.ctx.destroy_requested() {
738+
ActorLifecycleError::Destroying.build()
739+
} else {
740+
ActorLifecycleError::Starting.build()
741+
});
742+
}
729743
ActorInstanceState::Stopping(instance) => {
730744
let instance = instance.clone();
731745
instance
732746
.ctx
733747
.warn_work_sent_to_stopping_instance("active_actor");
734-
return Ok(instance);
748+
return Err(if instance.ctx.destroy_requested() {
749+
ActorLifecycleError::Destroying.build()
750+
} else {
751+
ActorLifecycleError::Stopping.build()
752+
});
735753
}
736754
}
737755
}

rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,8 @@ export class ActorConnRaw {
231231

232232
/**
233233
* If the query is dynamic (getForKey or getOrCreateForKey) and the error
234-
* indicates the previously resolved actor is gone (not_found or destroyed),
234+
* indicates the previously resolved actor is stale (not_found, starting,
235+
* stopping, or destroyed),
235236
* clear the cached actor ID and connection ID so the next operation
236237
* re-resolves to a fresh actor. Returns true if the identity was
237238
* invalidated.

rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,9 @@ export class ActorHandleRaw {
175175
const invalidated = this.#invalidateResolvedActorId(group, code);
176176
if (invalidated && attempt < maxAttempts - 1) {
177177
useQueryTarget =
178-
code === "stopping" || code.startsWith("destroyed_");
178+
(code === "starting" ||
179+
code === "stopping" ||
180+
code.startsWith("destroyed_"));
179181
if (useQueryTarget) {
180182
await this.#waitForRetryWindow();
181183
}
@@ -344,7 +346,10 @@ export class ActorHandleRaw {
344346

345347
const invalidated = this.#invalidateResolvedActorId(group, code);
346348
if (invalidated && attempt < maxAttempts - 1) {
347-
if (group === "actor" && code === "stopping") {
349+
if (
350+
group === "actor" &&
351+
(code === "starting" || code === "stopping")
352+
) {
348353
useQueryTarget = true;
349354
await new Promise((resolve) => setTimeout(resolve, 100));
350355
}
@@ -398,6 +403,7 @@ export class ActorHandleRaw {
398403

399404
return (
400405
code === "not_found" ||
406+
code === "starting" ||
401407
code === "stopping" ||
402408
code === "destroying" ||
403409
code.startsWith("destroyed_")
@@ -592,7 +598,9 @@ export class ActorHandleRaw {
592598
const invalidated = this.#invalidateResolvedActorId(group, code);
593599
if (invalidated && attempt < maxAttempts - 1) {
594600
useQueryTarget =
595-
code === "stopping" || code.startsWith("destroyed_");
601+
(code === "starting" ||
602+
code === "stopping" ||
603+
code.startsWith("destroyed_"));
596604
if (useQueryTarget) {
597605
await this.#waitForRetryWindow();
598606
}
@@ -662,7 +670,9 @@ export class ActorHandleRaw {
662670
const invalidated = this.#invalidateResolvedActorId(group, code);
663671
if (invalidated && attempt < maxAttempts - 1) {
664672
const useQueryTarget =
665-
code === "stopping" || code.startsWith("destroyed_");
673+
code === "starting" ||
674+
code === "stopping" ||
675+
code.startsWith("destroyed_");
666676
return {
667677
useQueryTarget,
668678
waitForRetryWindow: useQueryTarget,

rivetkit-typescript/packages/rivetkit/src/client/actor-query.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ export function isStaleResolvedActorError(
5050
return (
5151
group === "actor" &&
5252
(code === "not_found" ||
53+
code === "starting" ||
5354
code === "stopping" ||
5455
code.startsWith("destroyed_"))
5556
);

0 commit comments

Comments
 (0)