Skip to content

Commit 13072b7

Browse files
committed
fix(rivetkit-core): gate startup until runtime is ready
1 parent 38f839d commit 13072b7

2 files changed

Lines changed: 39 additions & 7 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/registry/http.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,26 @@ impl RegistryDispatcher {
1010
actor_id: &str,
1111
request: HttpRequest,
1212
) -> Result<HttpResponse> {
13-
let instance = self.active_actor(actor_id).await?;
1413
if request.path == "/metrics" {
14+
let instance = self.active_actor(actor_id).await?;
1515
return self.handle_metrics_fetch(&instance, &request);
1616
}
17+
1718
let request = build_http_request(request).await?;
19+
let framework_route = framework_http_route(request.uri().path())?;
20+
let instance = match self.active_actor(actor_id).await {
21+
Ok(instance) => instance,
22+
Err(error) => {
23+
if framework_route.is_some() {
24+
return message_boundary_error_response(
25+
request_encoding(request.headers()),
26+
framework_anyhow_status(&error),
27+
error,
28+
);
29+
}
30+
return Ok(inspector_anyhow_response(error));
31+
}
32+
};
1833
if let Some(response) = self.handle_inspector_fetch(&instance, &request).await? {
1934
return Ok(response);
2035
}
@@ -29,7 +44,7 @@ impl RegistryDispatcher {
2944
});
3045
};
3146

32-
if let Some(route) = framework_http_route(request.uri().path())? {
47+
if let Some(route) = framework_route {
3348
let response = self.handle_framework_fetch(&instance, request, route).await;
3449
rearm_sleep_after_request(instance.ctx.clone());
3550
return response;
@@ -899,6 +914,7 @@ mod tests {
899914
use http::StatusCode;
900915
use rivet_error::RivetError;
901916
use serde_json::json;
917+
use vbare::OwnedVersionedData;
902918

903919
#[derive(RivetError)]
904920
#[error("message", "incoming_too_long", "Incoming message too long")]
@@ -1074,8 +1090,6 @@ mod tests {
10741090

10751091
#[test]
10761092
fn message_boundary_error_response_serializes_bare_v3() {
1077-
use vbare::OwnedVersionedData;
1078-
10791093
let response = message_boundary_error_response(
10801094
HttpResponseEncoding::Bare,
10811095
StatusCode::BAD_REQUEST,

rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::actor::task::{
4848
};
4949
use crate::actor::task_types::StopReason;
5050
use crate::engine_process::EngineProcessManager;
51-
use crate::error::ActorRuntime;
51+
use crate::error::{ActorLifecycle as ActorLifecycleError, ActorRuntime};
5252
use crate::inspector::protocol::{
5353
self as inspector_protocol, ServerMessage as InspectorServerMessage,
5454
};
@@ -725,13 +725,31 @@ impl RegistryDispatcher {
725725
async fn active_actor(&self, actor_id: &str) -> Result<Arc<ActorTaskHandle>> {
726726
if let Some(instance) = self.actor_instances.get_async(&actor_id.to_owned()).await {
727727
match instance.get() {
728-
ActorInstanceState::Active(instance) => return Ok(instance.clone()),
728+
ActorInstanceState::Active(instance) => {
729+
let instance = instance.clone();
730+
if instance.ctx.ready() {
731+
return Ok(instance);
732+
}
733+
734+
instance
735+
.ctx
736+
.warn_work_sent_to_stopping_instance("active_actor");
737+
return Err(if instance.ctx.destroy_requested() {
738+
ActorLifecycleError::Destroying.build()
739+
} else {
740+
ActorLifecycleError::Stopping.build()
741+
});
742+
}
729743
ActorInstanceState::Stopping(instance) => {
730744
let instance = instance.clone();
731745
instance
732746
.ctx
733747
.warn_work_sent_to_stopping_instance("active_actor");
734-
return Ok(instance);
748+
return Err(if instance.ctx.destroy_requested() {
749+
ActorLifecycleError::Destroying.build()
750+
} else {
751+
ActorLifecycleError::Stopping.build()
752+
});
735753
}
736754
}
737755
}

0 commit comments

Comments
 (0)