Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion rivetkit-rust/packages/rivetkit-core/examples/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,8 @@ fn counter_factory() -> ActorFactory {
async fn main() -> Result<()> {
let mut registry = CoreRegistry::new();
registry.register("counter", counter_factory());
registry.serve().await
tokio::select! {
res = registry.serve() => res,
_ = tokio::signal::ctrl_c() => Ok(()),
}
}
3 changes: 2 additions & 1 deletion rivetkit-rust/packages/rivetkit-core/src/engine_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ impl EngineProcessManager {
.env("RIVET__METRICS__PORT", metrics_port.to_string())
.env("RIVET__FILE_SYSTEM__PATH", &db_path)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
.stderr(Stdio::piped())
.kill_on_drop(true);

let mut child = command
.spawn()
Expand Down
22 changes: 8 additions & 14 deletions rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ impl CoreRegistry {

pub async fn serve_with_config(self, config: ServeConfig) -> Result<()> {
let dispatcher = self.into_dispatcher(&config);
let mut engine_process = match config.engine_binary_path.as_ref() {
let _engine_process = match config.engine_binary_path.as_ref() {
Some(binary_path) => {
Some(EngineProcessManager::start(binary_path, &config.endpoint).await?)
}
Expand All @@ -427,7 +427,7 @@ impl CoreRegistry {
dispatcher: dispatcher.clone(),
});

let handle = start_envoy(rivet_envoy_client::config::EnvoyConfig {
let _handle = start_envoy(rivet_envoy_client::config::EnvoyConfig {
version: config.version,
endpoint: config.endpoint,
token: config.token,
Expand All @@ -441,18 +441,12 @@ impl CoreRegistry {
})
.await;

let shutdown_signal = tokio::signal::ctrl_c()
.await
.context("wait for registry shutdown signal");
handle.shutdown(false);

if let Some(engine_process) = engine_process.take() {
engine_process.shutdown().await?;
}

shutdown_signal?;

Ok(())
// Do not install `tokio::signal::ctrl_c()` here. It calls
// `sigaction(SIGINT, ...)` at the POSIX level, which overrides the
// host's default SIGINT handling when rivetkit-core is embedded in
// Node via NAPI and leaves the host process unable to exit. Callers
// drive shutdown themselves by dropping the task.
std::future::pending::<Result<()>>().await
}

fn into_dispatcher(self, config: &ServeConfig) -> Arc<RegistryDispatcher> {
Expand Down
5 changes: 4 additions & 1 deletion rivetkit-rust/packages/rivetkit/examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,8 @@ async fn save_chat_state(ctx: &Ctx<Chat>, state: &ChatState) -> Result<()> {
async fn main() -> Result<()> {
let mut registry = Registry::new();
registry.register::<Chat, _, _>("chat", run);
registry.serve().await
tokio::select! {
res = registry.serve() => res,
_ = tokio::signal::ctrl_c() => Ok(()),
}
}
5 changes: 4 additions & 1 deletion rivetkit-rust/packages/rivetkit/examples/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,8 @@ async fn main() -> Result<()> {
.action("increment", Counter::increment)
.action("get_count", Counter::get_count)
.done();
registry.serve().await
tokio::select! {
res = registry.serve() => res,
_ = tokio::signal::ctrl_c() => Ok(()),
}
}
Loading