Skip to content

Commit cf25e1a

Browse files
committed
fix(rivetkit): standardize startEngine + runtime mode based on config
1 parent 79767d3 commit cf25e1a

131 files changed

Lines changed: 5558 additions & 16733 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.claude/reference/build-troubleshooting.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
Known foot-guns when building RivetKit packages.
44

5+
## `registry.start()` fails with missing `@rivetkit/engine-cli-*`
6+
7+
- In monorepo development, `registry.start()` may start the local engine. If the optional `@rivetkit/engine-cli-*` platform package is missing, run `cargo build -p rivet-engine` and set `RIVET_ENGINE_BINARY=/home/nathan/r5/target/debug/rivet-engine`.
8+
59
## DTS / type build fails with missing `@rivetkit/*`
610

711
- If `rivetkit` type or DTS builds fail with missing `@rivetkit/*` declarations, run `pnpm build -F rivetkit` from repo root (Turbo build path) **before** changing TypeScript `paths`.

engine/packages/api-public/src/actors/utils.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,12 @@ pub async fn find_dc_for_actor_creation(
141141
) -> Result<u16> {
142142
let requested_dc_label = if let Some(dc_name) = &dc_name {
143143
// Use user-configured DC
144-
Some(ctx.config()
145-
.dc_for_name(dc_name)
146-
.ok_or_else(|| rivet_api_util::errors::Datacenter::NotFound.build())?
147-
.datacenter_label)
144+
Some(
145+
ctx.config()
146+
.dc_for_name(dc_name)
147+
.ok_or_else(|| rivet_api_util::errors::Datacenter::NotFound.build())?
148+
.datacenter_label,
149+
)
148150
} else {
149151
None
150152
};

engine/packages/api-public/src/runner_configs/serverless_health_check.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,12 @@ pub struct ServerlessHealthCheckRequest {
3434
#[serde(rename_all = "snake_case")]
3535
#[schema(as = RunnerConfigsServerlessHealthCheckResponse)]
3636
pub enum ServerlessHealthCheckResponse {
37-
Success { version: String },
38-
Failure { error: ServerlessMetadataErrorEnvelope },
37+
Success {
38+
version: String,
39+
},
40+
Failure {
41+
error: ServerlessMetadataErrorEnvelope,
42+
},
3943
}
4044

4145
#[utoipa::path(

engine/packages/engine/tests/common/ctx.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ impl TestCtx {
154154
opts.auth_admin_token.clone(),
155155
)
156156
});
157-
let mut dcs: Vec<TestDatacenter> = futures_util::future::try_join_all(setup_futures).await?;
157+
let mut dcs: Vec<TestDatacenter> =
158+
futures_util::future::try_join_all(setup_futures).await?;
158159
dcs.sort_by_key(|dc| dc.config.dc_label());
159160

160161
Ok(Self { dcs, opts })

engine/packages/engine/tests/common/test_envoy.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -409,21 +409,20 @@ impl rivet_test_envoy::EnvoyCallbacks for TestEnvoyCallbacks {
409409
}
410410
}
411411

412-
fn spawn_event_bridge(
413-
handle: EnvoyHandle,
414-
mut event_rx: mpsc::UnboundedReceiver<ActorEvent>,
415-
) {
412+
fn spawn_event_bridge(handle: EnvoyHandle, mut event_rx: mpsc::UnboundedReceiver<ActorEvent>) {
416413
tokio::spawn(async move {
417414
while let Some(event) = event_rx.recv().await {
418415
match event.event {
419-
rivet_runner_protocol::mk2::Event::EventActorIntent(intent) => match intent.intent {
420-
rivet_runner_protocol::mk2::ActorIntent::ActorIntentSleep => {
421-
handle.sleep_actor(event.actor_id, Some(event.generation));
422-
}
423-
rivet_runner_protocol::mk2::ActorIntent::ActorIntentStop => {
424-
handle.stop_actor(event.actor_id, Some(event.generation), None);
416+
rivet_runner_protocol::mk2::Event::EventActorIntent(intent) => {
417+
match intent.intent {
418+
rivet_runner_protocol::mk2::ActorIntent::ActorIntentSleep => {
419+
handle.sleep_actor(event.actor_id, Some(event.generation));
420+
}
421+
rivet_runner_protocol::mk2::ActorIntent::ActorIntentStop => {
422+
handle.stop_actor(event.actor_id, Some(event.generation), None);
423+
}
425424
}
426-
},
425+
}
427426
rivet_runner_protocol::mk2::Event::EventActorSetAlarm(alarm) => {
428427
handle.set_alarm(event.actor_id, alarm.alarm_ts, Some(event.generation));
429428
}
@@ -494,7 +493,10 @@ fn spawn_kv_bridge(handle: EnvoyHandle, mut kv_rx: mpsc::UnboundedReceiver<KvReq
494493
})
495494
}
496495
rivet_runner_protocol::mk2::KvRequestData::KvPutRequest(body) => handle
497-
.kv_put(req.actor_id, body.keys.into_iter().zip(body.values).collect())
496+
.kv_put(
497+
req.actor_id,
498+
body.keys.into_iter().zip(body.values).collect(),
499+
)
498500
.await
499501
.map(|_| rivet_runner_protocol::mk2::KvResponseData::KvPutResponse),
500502
rivet_runner_protocol::mk2::KvRequestData::KvDeleteRequest(body) => handle

engine/packages/engine/tests/common/test_runner.rs

Lines changed: 55 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,7 @@ impl ActorConfig {
9797

9898
pub async fn send_kv_get(&self, keys: Vec<Vec<u8>>) -> Result<mk2::KvGetResponse> {
9999
match self
100-
.send_kv(mk2::KvRequestData::KvGetRequest(mk2::KvGetRequest {
101-
keys,
102-
}))
100+
.send_kv(mk2::KvRequestData::KvGetRequest(mk2::KvGetRequest { keys }))
103101
.await?
104102
{
105103
mk2::KvResponseData::KvGetResponse(res) => Ok(res),
@@ -144,9 +142,9 @@ impl ActorConfig {
144142

145143
pub async fn send_kv_delete(&self, keys: Vec<Vec<u8>>) -> Result<()> {
146144
match self
147-
.send_kv(mk2::KvRequestData::KvDeleteRequest(
148-
mk2::KvDeleteRequest { keys },
149-
))
145+
.send_kv(mk2::KvRequestData::KvDeleteRequest(mk2::KvDeleteRequest {
146+
keys,
147+
}))
150148
.await?
151149
{
152150
mk2::KvResponseData::KvDeleteResponse => Ok(()),
@@ -305,7 +303,9 @@ impl RunnerConfigBuilder {
305303
endpoint: self.endpoint.context("endpoint is required")?,
306304
token: self.token.unwrap_or_else(|| "dev".to_string()),
307305
namespace: self.namespace.context("namespace is required")?,
308-
runner_name: self.runner_name.unwrap_or_else(|| "test-runner".to_string()),
306+
runner_name: self
307+
.runner_name
308+
.unwrap_or_else(|| "test-runner".to_string()),
309309
runner_key: self
310310
.runner_key
311311
.unwrap_or_else(|| format!("key-{:012x}", rand::random::<u64>())),
@@ -428,7 +428,9 @@ impl Runner {
428428
.context("failed to connect to runner WebSocket")?;
429429

430430
ws_stream
431-
.send(Message::Binary(self.encode_to_server(self.build_init())?.into()))
431+
.send(Message::Binary(
432+
self.encode_to_server(self.build_init())?.into(),
433+
))
432434
.await
433435
.context("failed to send runner init")?;
434436

@@ -632,10 +634,10 @@ impl Runner {
632634
actor_id: actor_id.clone(),
633635
generation,
634636
});
635-
self.actors.lock().await.insert(
636-
actor_id.clone(),
637-
ActorState { generation, actor },
638-
);
637+
self.actors
638+
.lock()
639+
.await
640+
.insert(actor_id.clone(), ActorState { generation, actor });
639641

640642
match start_result {
641643
ActorStartResult::Running => {
@@ -699,9 +701,12 @@ impl Runner {
699701
});
700702
});
701703
}
702-
ActorStopResult::Crash { code, message } => {
703-
self.send_state_stopped(checkpoint.actor_id, checkpoint.generation, code, Some(message))
704-
}
704+
ActorStopResult::Crash { code, message } => self.send_state_stopped(
705+
checkpoint.actor_id,
706+
checkpoint.generation,
707+
code,
708+
Some(message),
709+
),
705710
}
706711

707712
Ok(())
@@ -731,7 +736,11 @@ impl Runner {
731736
});
732737
}
733738

734-
async fn send_actor_event(&self, ws_stream: &mut WsStream, actor_event: ActorEvent) -> Result<()> {
739+
async fn send_actor_event(
740+
&self,
741+
ws_stream: &mut WsStream,
742+
actor_event: ActorEvent,
743+
) -> Result<()> {
735744
let mut indices = self.event_indices.lock().await;
736745
let index = indices
737746
.entry((actor_event.actor_id.clone(), actor_event.generation))
@@ -1081,7 +1090,10 @@ impl CrashOnStartActor {
10811090
}
10821091
}
10831092

1084-
pub fn new_with_notify(exit_code: i32, notify_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>) -> Self {
1093+
pub fn new_with_notify(
1094+
exit_code: i32,
1095+
notify_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
1096+
) -> Self {
10851097
Self {
10861098
exit_code,
10871099
notify_tx: Some(notify_tx),
@@ -1273,13 +1285,29 @@ impl TestActor for StopImmediatelyActor {
12731285
}
12741286

12751287
pub struct CustomActor {
1276-
on_start_fn: Box<dyn Fn(ActorConfig) -> Pin<Box<dyn Future<Output = Result<ActorStartResult>> + Send>> + Send + Sync>,
1277-
on_stop_fn: Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<ActorStopResult>> + Send>> + Send + Sync>,
1288+
on_start_fn: Box<
1289+
dyn Fn(ActorConfig) -> Pin<Box<dyn Future<Output = Result<ActorStartResult>> + Send>>
1290+
+ Send
1291+
+ Sync,
1292+
>,
1293+
on_stop_fn: Box<
1294+
dyn Fn() -> Pin<Box<dyn Future<Output = Result<ActorStopResult>> + Send>> + Send + Sync,
1295+
>,
12781296
}
12791297

12801298
pub struct CustomActorBuilder {
1281-
on_start_fn: Option<Box<dyn Fn(ActorConfig) -> Pin<Box<dyn Future<Output = Result<ActorStartResult>> + Send>> + Send + Sync>>,
1282-
on_stop_fn: Option<Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<ActorStopResult>> + Send>> + Send + Sync>>,
1299+
on_start_fn: Option<
1300+
Box<
1301+
dyn Fn(ActorConfig) -> Pin<Box<dyn Future<Output = Result<ActorStartResult>> + Send>>
1302+
+ Send
1303+
+ Sync,
1304+
>,
1305+
>,
1306+
on_stop_fn: Option<
1307+
Box<
1308+
dyn Fn() -> Pin<Box<dyn Future<Output = Result<ActorStopResult>> + Send>> + Send + Sync,
1309+
>,
1310+
>,
12831311
}
12841312

12851313
impl CustomActorBuilder {
@@ -1314,12 +1342,12 @@ impl CustomActorBuilder {
13141342

13151343
pub fn build(self) -> CustomActor {
13161344
CustomActor {
1317-
on_start_fn: self.on_start_fn.unwrap_or_else(|| {
1318-
Box::new(|_| Box::pin(async { Ok(ActorStartResult::Running) }))
1319-
}),
1320-
on_stop_fn: self.on_stop_fn.unwrap_or_else(|| {
1321-
Box::new(|| Box::pin(async { Ok(ActorStopResult::Success) }))
1322-
}),
1345+
on_start_fn: self
1346+
.on_start_fn
1347+
.unwrap_or_else(|| Box::new(|_| Box::pin(async { Ok(ActorStartResult::Running) }))),
1348+
on_stop_fn: self
1349+
.on_stop_fn
1350+
.unwrap_or_else(|| Box::new(|| Box::pin(async { Ok(ActorStopResult::Success) }))),
13231351
}
13241352
}
13251353
}

0 commit comments

Comments
 (0)