Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .claude/reference/build-troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

Known foot-guns when building RivetKit packages.

## `registry.start()` fails with missing `@rivetkit/engine-cli-*`

- In monorepo development, `registry.start()` may start the local engine. If the optional `@rivetkit/engine-cli-*` platform package is missing, run `cargo build -p rivet-engine` and set `RIVET_ENGINE_BINARY=/home/nathan/r5/target/debug/rivet-engine`.

## DTS / type build fails with missing `@rivetkit/*`

- If `rivetkit` type or DTS builds fail with missing `@rivetkit/*` declarations, run `pnpm build -F rivetkit` from repo root (Turbo build path) **before** changing TypeScript `paths`.
Expand Down
10 changes: 6 additions & 4 deletions engine/packages/api-public/src/actors/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,12 @@ pub async fn find_dc_for_actor_creation(
) -> Result<u16> {
let requested_dc_label = if let Some(dc_name) = &dc_name {
// Use user-configured DC
Some(ctx.config()
.dc_for_name(dc_name)
.ok_or_else(|| rivet_api_util::errors::Datacenter::NotFound.build())?
.datacenter_label)
Some(
ctx.config()
.dc_for_name(dc_name)
.ok_or_else(|| rivet_api_util::errors::Datacenter::NotFound.build())?
.datacenter_label,
)
} else {
None
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ pub struct ServerlessHealthCheckRequest {
#[serde(rename_all = "snake_case")]
#[schema(as = RunnerConfigsServerlessHealthCheckResponse)]
pub enum ServerlessHealthCheckResponse {
Success { version: String },
Failure { error: ServerlessMetadataErrorEnvelope },
Success {
version: String,
},
Failure {
error: ServerlessMetadataErrorEnvelope,
},
}

#[utoipa::path(
Expand Down
3 changes: 2 additions & 1 deletion engine/packages/engine/tests/common/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ impl TestCtx {
opts.auth_admin_token.clone(),
)
});
let mut dcs: Vec<TestDatacenter> = futures_util::future::try_join_all(setup_futures).await?;
let mut dcs: Vec<TestDatacenter> =
futures_util::future::try_join_all(setup_futures).await?;
dcs.sort_by_key(|dc| dc.config.dc_label());

Ok(Self { dcs, opts })
Expand Down
26 changes: 14 additions & 12 deletions engine/packages/engine/tests/common/test_envoy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,21 +409,20 @@ impl rivet_test_envoy::EnvoyCallbacks for TestEnvoyCallbacks {
}
}

fn spawn_event_bridge(
handle: EnvoyHandle,
mut event_rx: mpsc::UnboundedReceiver<ActorEvent>,
) {
fn spawn_event_bridge(handle: EnvoyHandle, mut event_rx: mpsc::UnboundedReceiver<ActorEvent>) {
tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
match event.event {
rivet_runner_protocol::mk2::Event::EventActorIntent(intent) => match intent.intent {
rivet_runner_protocol::mk2::ActorIntent::ActorIntentSleep => {
handle.sleep_actor(event.actor_id, Some(event.generation));
}
rivet_runner_protocol::mk2::ActorIntent::ActorIntentStop => {
handle.stop_actor(event.actor_id, Some(event.generation), None);
rivet_runner_protocol::mk2::Event::EventActorIntent(intent) => {
match intent.intent {
rivet_runner_protocol::mk2::ActorIntent::ActorIntentSleep => {
handle.sleep_actor(event.actor_id, Some(event.generation));
}
rivet_runner_protocol::mk2::ActorIntent::ActorIntentStop => {
handle.stop_actor(event.actor_id, Some(event.generation), None);
}
}
},
}
rivet_runner_protocol::mk2::Event::EventActorSetAlarm(alarm) => {
handle.set_alarm(event.actor_id, alarm.alarm_ts, Some(event.generation));
}
Expand Down Expand Up @@ -494,7 +493,10 @@ fn spawn_kv_bridge(handle: EnvoyHandle, mut kv_rx: mpsc::UnboundedReceiver<KvReq
})
}
rivet_runner_protocol::mk2::KvRequestData::KvPutRequest(body) => handle
.kv_put(req.actor_id, body.keys.into_iter().zip(body.values).collect())
.kv_put(
req.actor_id,
body.keys.into_iter().zip(body.values).collect(),
)
.await
.map(|_| rivet_runner_protocol::mk2::KvResponseData::KvPutResponse),
rivet_runner_protocol::mk2::KvRequestData::KvDeleteRequest(body) => handle
Expand Down
82 changes: 55 additions & 27 deletions engine/packages/engine/tests/common/test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ impl ActorConfig {

pub async fn send_kv_get(&self, keys: Vec<Vec<u8>>) -> Result<mk2::KvGetResponse> {
match self
.send_kv(mk2::KvRequestData::KvGetRequest(mk2::KvGetRequest {
keys,
}))
.send_kv(mk2::KvRequestData::KvGetRequest(mk2::KvGetRequest { keys }))
.await?
{
mk2::KvResponseData::KvGetResponse(res) => Ok(res),
Expand Down Expand Up @@ -144,9 +142,9 @@ impl ActorConfig {

pub async fn send_kv_delete(&self, keys: Vec<Vec<u8>>) -> Result<()> {
match self
.send_kv(mk2::KvRequestData::KvDeleteRequest(
mk2::KvDeleteRequest { keys },
))
.send_kv(mk2::KvRequestData::KvDeleteRequest(mk2::KvDeleteRequest {
keys,
}))
.await?
{
mk2::KvResponseData::KvDeleteResponse => Ok(()),
Expand Down Expand Up @@ -305,7 +303,9 @@ impl RunnerConfigBuilder {
endpoint: self.endpoint.context("endpoint is required")?,
token: self.token.unwrap_or_else(|| "dev".to_string()),
namespace: self.namespace.context("namespace is required")?,
runner_name: self.runner_name.unwrap_or_else(|| "test-runner".to_string()),
runner_name: self
.runner_name
.unwrap_or_else(|| "test-runner".to_string()),
runner_key: self
.runner_key
.unwrap_or_else(|| format!("key-{:012x}", rand::random::<u64>())),
Expand Down Expand Up @@ -428,7 +428,9 @@ impl Runner {
.context("failed to connect to runner WebSocket")?;

ws_stream
.send(Message::Binary(self.encode_to_server(self.build_init())?.into()))
.send(Message::Binary(
self.encode_to_server(self.build_init())?.into(),
))
.await
.context("failed to send runner init")?;

Expand Down Expand Up @@ -632,10 +634,10 @@ impl Runner {
actor_id: actor_id.clone(),
generation,
});
self.actors.lock().await.insert(
actor_id.clone(),
ActorState { generation, actor },
);
self.actors
.lock()
.await
.insert(actor_id.clone(), ActorState { generation, actor });

match start_result {
ActorStartResult::Running => {
Expand Down Expand Up @@ -699,9 +701,12 @@ impl Runner {
});
});
}
ActorStopResult::Crash { code, message } => {
self.send_state_stopped(checkpoint.actor_id, checkpoint.generation, code, Some(message))
}
ActorStopResult::Crash { code, message } => self.send_state_stopped(
checkpoint.actor_id,
checkpoint.generation,
code,
Some(message),
),
}

Ok(())
Expand Down Expand Up @@ -731,7 +736,11 @@ impl Runner {
});
}

async fn send_actor_event(&self, ws_stream: &mut WsStream, actor_event: ActorEvent) -> Result<()> {
async fn send_actor_event(
&self,
ws_stream: &mut WsStream,
actor_event: ActorEvent,
) -> Result<()> {
let mut indices = self.event_indices.lock().await;
let index = indices
.entry((actor_event.actor_id.clone(), actor_event.generation))
Expand Down Expand Up @@ -1081,7 +1090,10 @@ impl CrashOnStartActor {
}
}

pub fn new_with_notify(exit_code: i32, notify_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>) -> Self {
pub fn new_with_notify(
exit_code: i32,
notify_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
) -> Self {
Self {
exit_code,
notify_tx: Some(notify_tx),
Expand Down Expand Up @@ -1273,13 +1285,29 @@ impl TestActor for StopImmediatelyActor {
}

pub struct CustomActor {
on_start_fn: Box<dyn Fn(ActorConfig) -> Pin<Box<dyn Future<Output = Result<ActorStartResult>> + Send>> + Send + Sync>,
on_stop_fn: Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<ActorStopResult>> + Send>> + Send + Sync>,
on_start_fn: Box<
dyn Fn(ActorConfig) -> Pin<Box<dyn Future<Output = Result<ActorStartResult>> + Send>>
+ Send
+ Sync,
>,
on_stop_fn: Box<
dyn Fn() -> Pin<Box<dyn Future<Output = Result<ActorStopResult>> + Send>> + Send + Sync,
>,
}

pub struct CustomActorBuilder {
on_start_fn: Option<Box<dyn Fn(ActorConfig) -> Pin<Box<dyn Future<Output = Result<ActorStartResult>> + Send>> + Send + Sync>>,
on_stop_fn: Option<Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<ActorStopResult>> + Send>> + Send + Sync>>,
on_start_fn: Option<
Box<
dyn Fn(ActorConfig) -> Pin<Box<dyn Future<Output = Result<ActorStartResult>> + Send>>
+ Send
+ Sync,
>,
>,
on_stop_fn: Option<
Box<
dyn Fn() -> Pin<Box<dyn Future<Output = Result<ActorStopResult>> + Send>> + Send + Sync,
>,
>,
}

impl CustomActorBuilder {
Expand Down Expand Up @@ -1314,12 +1342,12 @@ impl CustomActorBuilder {

pub fn build(self) -> CustomActor {
CustomActor {
on_start_fn: self.on_start_fn.unwrap_or_else(|| {
Box::new(|_| Box::pin(async { Ok(ActorStartResult::Running) }))
}),
on_stop_fn: self.on_stop_fn.unwrap_or_else(|| {
Box::new(|| Box::pin(async { Ok(ActorStopResult::Success) }))
}),
on_start_fn: self
.on_start_fn
.unwrap_or_else(|| Box::new(|_| Box::pin(async { Ok(ActorStartResult::Running) }))),
on_stop_fn: self
.on_stop_fn
.unwrap_or_else(|| Box::new(|| Box::pin(async { Ok(ActorStopResult::Success) }))),
}
}
}
Expand Down
Loading
Loading