Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions rivetkit-rust/engine/artifacts/errors/actor.starting.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"code": "starting",
"group": "actor",
"message": "Actor is starting."
}
3 changes: 3 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use serde::{Deserialize, Serialize};
#[derive(RivetError, Debug, Clone, Deserialize, Serialize)]
#[error("actor")]
pub enum ActorLifecycle {
#[error("starting", "Actor is starting.")]
Starting,

#[error("not_ready", "Actor is not ready.")]
NotReady,

Expand Down
22 changes: 18 additions & 4 deletions rivetkit-rust/packages/rivetkit-core/src/registry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,26 @@ impl RegistryDispatcher {
actor_id: &str,
request: HttpRequest,
) -> Result<HttpResponse> {
let instance = self.active_actor(actor_id).await?;
if request.path == "/metrics" {
let instance = self.active_actor(actor_id).await?;
return self.handle_metrics_fetch(&instance, &request);
}

let request = build_http_request(request).await?;
let framework_route = framework_http_route(request.uri().path())?;
let instance = match self.active_actor(actor_id).await {
Ok(instance) => instance,
Err(error) => {
if framework_route.is_some() {
return message_boundary_error_response(
request_encoding(request.headers()),
framework_anyhow_status(&error),
error,
);
}
return Ok(inspector_anyhow_response(error));
}
};
if let Some(response) = self.handle_inspector_fetch(&instance, &request).await? {
return Ok(response);
}
Expand All @@ -29,7 +44,7 @@ impl RegistryDispatcher {
});
};

if let Some(route) = framework_http_route(request.uri().path())? {
if let Some(route) = framework_route {
let response = self.handle_framework_fetch(&instance, request, route).await;
rearm_sleep_after_request(instance.ctx.clone());
return response;
Expand Down Expand Up @@ -899,6 +914,7 @@ mod tests {
use http::StatusCode;
use rivet_error::RivetError;
use serde_json::json;
use vbare::OwnedVersionedData;

#[derive(RivetError)]
#[error("message", "incoming_too_long", "Incoming message too long")]
Expand Down Expand Up @@ -1074,8 +1090,6 @@ mod tests {

#[test]
fn message_boundary_error_response_serializes_bare_v3() {
use vbare::OwnedVersionedData;

let response = message_boundary_error_response(
HttpResponseEncoding::Bare,
StatusCode::BAD_REQUEST,
Expand Down
24 changes: 21 additions & 3 deletions rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::actor::task::{
};
use crate::actor::task_types::StopReason;
use crate::engine_process::EngineProcessManager;
use crate::error::ActorRuntime;
use crate::error::{ActorLifecycle as ActorLifecycleError, ActorRuntime};
use crate::inspector::protocol::{
self as inspector_protocol, ServerMessage as InspectorServerMessage,
};
Expand Down Expand Up @@ -725,13 +725,31 @@ impl RegistryDispatcher {
async fn active_actor(&self, actor_id: &str) -> Result<Arc<ActorTaskHandle>> {
if let Some(instance) = self.actor_instances.get_async(&actor_id.to_owned()).await {
match instance.get() {
ActorInstanceState::Active(instance) => return Ok(instance.clone()),
ActorInstanceState::Active(instance) => {
let instance = instance.clone();
if instance.ctx.ready() {
return Ok(instance);
}

instance
.ctx
.warn_work_sent_to_stopping_instance("active_actor");
return Err(if instance.ctx.destroy_requested() {
ActorLifecycleError::Destroying.build()
} else {
ActorLifecycleError::Starting.build()
});
}
ActorInstanceState::Stopping(instance) => {
let instance = instance.clone();
instance
.ctx
.warn_work_sent_to_stopping_instance("active_actor");
return Ok(instance);
return Err(if instance.ctx.destroy_requested() {
ActorLifecycleError::Destroying.build()
} else {
ActorLifecycleError::Stopping.build()
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ export class ActorConnRaw {

/**
* If the query is dynamic (getForKey or getOrCreateForKey) and the error
* indicates the previously resolved actor is gone (not_found or destroyed),
* indicates the previously resolved actor is stale (not_found, starting,
* stopping, or destroyed),
* clear the cached actor ID and connection ID so the next operation
* re-resolves to a fresh actor. Returns true if the identity was
* invalidated.
Expand Down
18 changes: 14 additions & 4 deletions rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ export class ActorHandleRaw {
const invalidated = this.#invalidateResolvedActorId(group, code);
if (invalidated && attempt < maxAttempts - 1) {
useQueryTarget =
code === "stopping" || code.startsWith("destroyed_");
(code === "starting" ||
code === "stopping" ||
code.startsWith("destroyed_"));
if (useQueryTarget) {
await this.#waitForRetryWindow();
}
Expand Down Expand Up @@ -344,7 +346,10 @@ export class ActorHandleRaw {

const invalidated = this.#invalidateResolvedActorId(group, code);
if (invalidated && attempt < maxAttempts - 1) {
if (group === "actor" && code === "stopping") {
if (
group === "actor" &&
(code === "starting" || code === "stopping")
) {
useQueryTarget = true;
await new Promise((resolve) => setTimeout(resolve, 100));
}
Expand Down Expand Up @@ -398,6 +403,7 @@ export class ActorHandleRaw {

return (
code === "not_found" ||
code === "starting" ||
code === "stopping" ||
code === "destroying" ||
code.startsWith("destroyed_")
Expand Down Expand Up @@ -592,7 +598,9 @@ export class ActorHandleRaw {
const invalidated = this.#invalidateResolvedActorId(group, code);
if (invalidated && attempt < maxAttempts - 1) {
useQueryTarget =
code === "stopping" || code.startsWith("destroyed_");
(code === "starting" ||
code === "stopping" ||
code.startsWith("destroyed_"));
if (useQueryTarget) {
await this.#waitForRetryWindow();
}
Expand Down Expand Up @@ -662,7 +670,9 @@ export class ActorHandleRaw {
const invalidated = this.#invalidateResolvedActorId(group, code);
if (invalidated && attempt < maxAttempts - 1) {
const useQueryTarget =
code === "stopping" || code.startsWith("destroyed_");
code === "starting" ||
code === "stopping" ||
code.startsWith("destroyed_");
return {
useQueryTarget,
waitForRetryWindow: useQueryTarget,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export function isStaleResolvedActorError(
return (
group === "actor" &&
(code === "not_found" ||
code === "starting" ||
code === "stopping" ||
code.startsWith("destroyed_"))
);
Expand Down
Loading