Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 39 additions & 31 deletions engine/sdks/rust/envoy-client/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::task::{JoinError, JoinSet};
use tracing::Instrument;

use crate::config::{HttpRequest, HttpResponse, WebSocketMessage};
use crate::connection::ws_send;
Expand Down Expand Up @@ -144,6 +145,14 @@ pub fn create_actor(
(tx, active_http_request_count)
}

#[tracing::instrument(
skip_all,
fields(
actor_id = %actor_id,
generation = generation,
actor_key = %config.key.as_deref().unwrap_or(""),
),
)]
async fn actor_inner(
shared: Arc<SharedContext>,
actor_id: String,
Expand Down Expand Up @@ -194,7 +203,7 @@ async fn actor_inner(
.await;

if let Err(error) = start_result {
tracing::error!(actor_id = %ctx.actor_id, ?error, "actor start failed");
tracing::error!(?error, "actor start failed");
send_event(
&mut ctx,
protocol::Event::EventActorStateUpdate(protocol::EventActorStateUpdate {
Expand All @@ -209,7 +218,7 @@ async fn actor_inner(

if let Some(meta_entries) = handle.take_pending_hibernation_restore(&actor_id) {
if let Err(error) = handle_hws_restore(&mut ctx, &handle, meta_entries).await {
tracing::error!(actor_id = %ctx.actor_id, ?error, "actor hibernation restore failed");
tracing::error!(?error, "actor hibernation restore failed");
send_event(
&mut ctx,
protocol::Event::EventActorStateUpdate(protocol::EventActorStateUpdate {
Expand Down Expand Up @@ -241,7 +250,7 @@ async fn actor_inner(
}
} => {
if let Some(result) = maybe_task {
handle_http_request_task_result(&ctx, result);
handle_http_request_task_result(result);
}
}
msg = async {
Expand Down Expand Up @@ -275,7 +284,6 @@ async fn actor_inner(
} => {
if pending_stop.is_some() {
tracing::warn!(
actor_id = %ctx.actor_id,
command_idx,
"ignoring duplicate stop while actor teardown is in progress"
);
Expand All @@ -294,7 +302,6 @@ async fn actor_inner(
ToActor::Lost => {
if pending_stop.is_some() {
tracing::warn!(
actor_id = %ctx.actor_id,
"ignoring lost signal while actor teardown is in progress"
);
continue;
Expand Down Expand Up @@ -368,7 +375,7 @@ async fn actor_inner(
}

abort_and_join_http_request_tasks(&mut ctx, &mut http_request_tasks).await;
tracing::debug!(actor_id = %ctx.actor_id, "envoy actor stopped");
tracing::debug!("envoy actor stopped");
}

fn send_event(ctx: &mut ActorContext, inner: protocol::Event) {
Expand Down Expand Up @@ -409,7 +416,7 @@ async fn begin_stop(
.await;

if let Err(error) = stop_result {
tracing::error!(actor_id = %ctx.actor_id, ?error, "actor stop failed");
tracing::error!(?error, "actor stop failed");
stop_code = protocol::StopCode::Error;
if stop_message.is_none() {
stop_message = Some(format!("{error:#}"));
Expand Down Expand Up @@ -451,7 +458,6 @@ fn finalize_stop(
}
Err(error) => {
tracing::warn!(
actor_id = %ctx.actor_id,
?error,
"actor stop completion handle dropped before signaling teardown result"
);
Expand All @@ -467,7 +473,7 @@ fn send_stopped_event_for_result(
stop_result: anyhow::Result<()>,
) {
if let Err(error) = stop_result {
tracing::error!(actor_id = %ctx.actor_id, ?error, "actor stop completion failed");
tracing::error!(?error, "actor stop completion failed");
stop_code = protocol::StopCode::Error;
if stop_message.is_none() {
stop_message = Some(format!("{error:#}"));
Expand Down Expand Up @@ -541,37 +547,40 @@ fn handle_req_start(
let request_id = message_id.request_id;
let request_guard = ActiveHttpRequestGuard::new(ctx.active_http_request_count.clone());

http_request_tasks.spawn(async move {
let _request_guard = request_guard;
let response = shared
.config
.callbacks
.fetch(handle_clone, actor_id, gateway_id, request_id, request)
.await;
http_request_tasks.spawn(
async move {
let _request_guard = request_guard;
let response = shared
.config
.callbacks
.fetch(handle_clone, actor_id, gateway_id, request_id, request)
.await;

match response {
Ok(response) => {
send_response(&shared, gateway_id, request_id, response).await;
}
Err(error) => {
tracing::error!(?error, "fetch failed");
match response {
Ok(response) => {
send_response(&shared, gateway_id, request_id, response).await;
}
Err(error) => {
tracing::error!(?error, "fetch failed");
}
}
}
});
.in_current_span(),
);

if !req.stream {
ctx.pending_requests
.remove(&[&message_id.gateway_id, &message_id.request_id]);
}
}

fn handle_http_request_task_result(ctx: &ActorContext, result: Result<(), JoinError>) {
fn handle_http_request_task_result(result: Result<(), JoinError>) {
if let Err(error) = result {
if error.is_cancelled() {
return;
}

tracing::error!(actor_id = %ctx.actor_id, ?error, "http request task failed");
tracing::error!(?error, "http request task failed");
}
}

Expand All @@ -585,15 +594,14 @@ async fn abort_and_join_http_request_tasks(

let active_http_request_count = ctx.active_http_request_count.load();
tracing::debug!(
actor_id = %ctx.actor_id,
active_http_request_count,
"aborting in-flight http request tasks"
);

http_request_tasks.abort_all();

while let Some(result) = http_request_tasks.join_next().await {
handle_http_request_task_result(ctx, result);
handle_http_request_task_result(result);
}
}

Expand Down Expand Up @@ -633,7 +641,7 @@ fn spawn_ws_outgoing_task(
request_id: protocol::RequestId,
mut outgoing_rx: mpsc::UnboundedReceiver<crate::config::WsOutgoing>,
) {
tokio::spawn(async move {
let ws_task = async move {
let mut idx: u16 = 0;
while let Some(msg) = outgoing_rx.recv().await {
idx += 1;
Expand Down Expand Up @@ -681,7 +689,8 @@ fn spawn_ws_outgoing_task(
}
}
}
});
};
tokio::spawn(ws_task.in_current_span());
}

async fn handle_ws_open(
Expand Down Expand Up @@ -896,7 +905,6 @@ async fn handle_ws_message(
if wrapping_lte_u16(received_index, previous_index) {
tracing::info!(
request_id = id_to_str(&message_id.request_id),
actor_id = %ctx.actor_id,
previous_index,
received_index,
"received duplicate hibernating websocket message"
Expand All @@ -908,7 +916,6 @@ async fn handle_ws_message(
if received_index != expected_index {
tracing::warn!(
request_id = id_to_str(&message_id.request_id),
actor_id = %ctx.actor_id,
previous_index,
expected_index,
received_index,
Expand Down Expand Up @@ -1621,6 +1628,7 @@ mod tests {
)),
protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)),
shutting_down: std::sync::atomic::AtomicBool::new(false),
stopped_tx: tokio::sync::watch::channel(true).0,
});
(shared, envoy_rx)
}
Expand Down
6 changes: 6 additions & 0 deletions engine/sdks/rust/envoy-client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use rivet_envoy_protocol as protocol;
use rivet_util::async_counter::AsyncCounter;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::watch;

use crate::actor::ToActor;
use crate::config::EnvoyConfig;
Expand All @@ -29,6 +30,11 @@ pub struct SharedContext {
pub ws_tx: Arc<Mutex<Option<mpsc::UnboundedSender<WsTxMessage>>>>,
pub protocol_metadata: Arc<Mutex<Option<protocol::ProtocolMetadata>>>,
pub shutting_down: AtomicBool,
// Latched signal fired by `envoy_loop` after its cleanup block completes.
// Waiters observing `true` are guaranteed that the loop has exited and
// every pending KV/SQLite request has been resolved (with `EnvoyShutdownError`
// if it didn't complete naturally).
pub stopped_tx: watch::Sender<bool>,
}

#[derive(Debug)]
Expand Down
14 changes: 7 additions & 7 deletions engine/sdks/rust/envoy-client/src/envoy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ pub fn start_envoy_sync(config: EnvoyConfig) -> EnvoyHandle {
fn start_envoy_sync_inner(config: EnvoyConfig) -> EnvoyHandle {
let (envoy_tx, envoy_rx) = mpsc::unbounded_channel::<ToEnvoyMessage>();
let (start_tx, start_rx) = tokio::sync::watch::channel(());
let (stopped_tx, _stopped_rx) = tokio::sync::watch::channel(false);

let envoy_key = uuid::Uuid::new_v4().to_string();
let shared = Arc::new(SharedContext {
Expand All @@ -264,20 +265,14 @@ fn start_envoy_sync_inner(config: EnvoyConfig) -> EnvoyHandle {
ws_tx: Arc::new(tokio::sync::Mutex::new(None)),
protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)),
shutting_down: std::sync::atomic::AtomicBool::new(false),
stopped_tx,
});

let handle = EnvoyHandle {
shared: shared.clone(),
started_rx: start_rx,
};

// Start signal handler
let handle2 = handle.clone();
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
handle2.shutdown(false);
});

start_connection(shared.clone());

let ctx = EnvoyContext {
Expand Down Expand Up @@ -459,6 +454,11 @@ async fn envoy_loop(
tracing::info!("envoy stopped");

ctx.shared.config.callbacks.on_shutdown();

// Latched signal: waiters on `EnvoyHandle::wait_stopped` observe this and
// any future callers of `wait_stopped` resolve immediately because watch
// retains the last value.
let _ = ctx.shared.stopped_tx.send(true);
}

async fn handle_conn_message(
Expand Down
1 change: 1 addition & 0 deletions engine/sdks/rust/envoy-client/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ mod tests {
)),
protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)),
shutting_down: std::sync::atomic::AtomicBool::new(false),
stopped_tx: tokio::sync::watch::channel(true).0,
});
let handle = EnvoyHandle {
shared: shared.clone(),
Expand Down
23 changes: 23 additions & 0 deletions engine/sdks/rust/envoy-client/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,29 @@ impl EnvoyHandle {
}
}

/// Resolves when the envoy loop has finished its cleanup block.
///
/// Returning does NOT imply successful delivery of pending KV/SQLite/tunnel
/// requests. The cleanup block errors out every outstanding request with
/// `"envoy shutting down"`. Callers needing durability must wait on individual
/// request acks before invoking shutdown.
///
/// Latched: safe to call before, during, or after the envoy loop exits.
/// A waiter arriving after the loop already exited resolves immediately.
pub async fn wait_stopped(&self) {
let mut rx = self.shared.stopped_tx.subscribe();
if *rx.borrow_and_update() {
return;
}
let _ = rx.changed().await;
}

/// Convenience: signal shutdown then await `wait_stopped`.
pub async fn shutdown_and_wait(&self, immediate: bool) {
self.shutdown(immediate);
self.wait_stopped().await;
}

pub async fn get_protocol_metadata(&self) -> Option<protocol::ProtocolMetadata> {
self.shared.protocol_metadata.lock().await.clone()
}
Expand Down
5 changes: 5 additions & 0 deletions rivetkit-rust/engine/artifacts/errors/registry.shut_down.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"code": "shut_down",
"group": "registry",
"message": "Registry is shut down."
}
7 changes: 7 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
- Flush the actor-connect `WebSocketSender` after queuing a setup `Error` frame and before closing so the envoy writer handles the error before the close terminates the connection.
- Bound actor-connect websocket setup at the registry boundary as well as inside the actor task. The HTTP upgrade can complete before `connection_open` replies, so a missing reply must still close the socket instead of idling until the client test timeout.

## Run modes

- Two run modes exist for `CoreRegistry`. **Persistent envoy**: `serve_with_config(...)` starts one outbound envoy via `start_envoy` and holds it for the process lifetime; used by standalone Rust binaries and TS `registry.start()`. **Serverless request**: `into_serverless_runtime(...)` returns a `CoreServerlessRuntime` whose `handle_request(...)` lazily starts an envoy on first request via `ensure_envoy(...)` and caches it; used by Node/Bun/Deno HTTP hosts and platform fetch handlers. Both modes end up holding a long-lived `EnvoyHandle`.
- Shutdown is a property of the host's `CoreRegistry` handle, not of whichever entrypoint ran first. Route process-level shutdown (SIGINT/SIGTERM) through a single `CoreRegistry::shutdown()` that trips one shared cancel token observed by `serve_with_config` and calls `CoreServerlessRuntime::shutdown()` on the cached runtime. Never attach the shutdown signal to `serve_with_config`'s config parameter — that misses Mode B entirely.
- `rivetkit-core` and `rivet-envoy-client` must not install process signal handlers (no `tokio::signal::ctrl_c()` in library code). `tokio::signal::ctrl_c()` calls `sigaction(SIGINT, ...)` at the POSIX level and prevents Node from exiting when rivetkit is embedded via NAPI. Signal policy belongs to the host binary or the TS registry layer.
- Per-request `CancellationToken` on `handle_serverless_request` cancels a single in-flight request and does not tear down the cached envoy. Do not overload it with registry shutdown.

## Test harness

- `tests/modules/task.rs` tests that install a tracing subscriber with `set_default(...)` must take `test_hook_lock()` first, or full `cargo test` parallelism makes the log-capture assertions flaky.
17 changes: 14 additions & 3 deletions rivetkit-rust/packages/rivetkit-core/examples/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,19 @@ fn counter_factory() -> ActorFactory {
async fn main() -> Result<()> {
let mut registry = CoreRegistry::new();
registry.register("counter", counter_factory());
tokio::select! {
res = registry.serve() => res,
_ = tokio::signal::ctrl_c() => Ok(()),
let token = tokio_util::sync::CancellationToken::new();
let serve = tokio::spawn({
let token = token.clone();
async move { registry.serve(token).await }
});
match tokio::signal::ctrl_c().await {
Ok(()) => {}
Err(err) => tracing::warn!(?err, "ctrl_c install failed; cancelling anyway"),
}
token.cancel();
match serve.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(join_err) => Err(join_err.into()),
}
}
Loading
Loading