Skip to content

Commit 134c537

Browse files
committed
feat(rivetkit): expose runtime diagnostics
1 parent b5f770f commit 134c537

23 files changed

Lines changed: 648 additions & 66 deletions

File tree

engine/sdks/rust/envoy-client/src/actor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1652,6 +1652,7 @@ mod tests {
16521652
envoy_key: "test-envoy".to_string(),
16531653
envoy_tx,
16541654
actors: Arc::new(std::sync::Mutex::new(HashMap::new())),
1655+
actors_notify: Arc::new(tokio::sync::Notify::new()),
16551656
live_tunnel_requests: Arc::new(std::sync::Mutex::new(HashMap::new())),
16561657
pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::new())),
16571658
ws_tx: Arc::new(tokio::sync::Mutex::new(

engine/sdks/rust/envoy-client/src/context.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use std::sync::atomic::AtomicBool;
55

66
use crate::async_counter::AsyncCounter;
77
use rivet_envoy_protocol as protocol;
8-
use tokio::sync::Mutex;
98
use tokio::sync::mpsc;
9+
use tokio::sync::Mutex;
10+
use tokio::sync::Notify;
1011
use tokio::sync::watch;
1112

1213
use crate::actor::ToActor;
@@ -24,6 +25,7 @@ pub struct SharedContext {
2425
pub envoy_key: String,
2526
pub envoy_tx: mpsc::UnboundedSender<ToEnvoyMessage>,
2627
pub actors: Arc<StdMutex<HashMap<String, HashMap<u32, SharedActorEntry>>>>,
28+
pub actors_notify: Arc<Notify>,
2729
pub live_tunnel_requests: Arc<StdMutex<HashMap<[u8; 8], String>>>,
2830
pub pending_hibernation_restores:
2931
Arc<StdMutex<HashMap<String, Vec<HibernatingWebSocketMetadata>>>>,

engine/sdks/rust/envoy-client/src/envoy.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ impl EnvoyContext {
183183
},
184184
);
185185

186+
self.shared.actors_notify.notify_waiters();
187+
186188
if let Some(messages) = self.buffered_actor_messages.remove(&buffered_actor_id) {
187189
for message in messages {
188190
match message {
@@ -216,6 +218,7 @@ impl EnvoyContext {
216218
shared.remove(actor_id);
217219
}
218220
}
221+
self.shared.actors_notify.notify_waiters();
219222
}
220223

221224
pub fn get_actor(&self, actor_id: &str, generation: Option<u32>) -> Option<&ActorEntry> {
@@ -297,6 +300,7 @@ fn start_envoy_sync_inner(config: EnvoyConfig) -> EnvoyHandle {
297300
envoy_key,
298301
envoy_tx: envoy_tx.clone(),
299302
actors: Arc::new(std::sync::Mutex::new(HashMap::new())),
303+
actors_notify: Arc::new(tokio::sync::Notify::new()),
300304
live_tunnel_requests: Arc::new(std::sync::Mutex::new(HashMap::new())),
301305
pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::new())),
302306
ws_tx: Arc::new(tokio::sync::Mutex::new(None)),

engine/sdks/rust/envoy-client/src/events.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ mod tests {
164164
envoy_key: "test-envoy".to_string(),
165165
envoy_tx,
166166
actors: Arc::new(std::sync::Mutex::new(HashMap::new())),
167+
actors_notify: Arc::new(tokio::sync::Notify::new()),
167168
live_tunnel_requests: Arc::new(std::sync::Mutex::new(HashMap::new())),
168169
pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::new())),
169170
ws_tx: Arc::new(tokio::sync::Mutex::new(

engine/sdks/rust/envoy-client/src/handle.rs

Lines changed: 92 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ pub struct EnvoyHandle {
1717
pub(crate) started_rx: tokio::sync::watch::Receiver<()>,
1818
}
1919

20+
#[derive(Debug, Clone, PartialEq, Eq)]
21+
pub struct ServerlessActorStart {
22+
pub actor_id: String,
23+
pub generation: u32,
24+
}
25+
2026
impl EnvoyHandle {
2127
#[doc(hidden)]
2228
pub fn from_shared(shared: Arc<SharedContext>) -> Self {
@@ -85,6 +91,23 @@ impl EnvoyHandle {
8591
&self.shared.config.namespace
8692
}
8793

94+
pub fn active_actor_count(&self) -> usize {
95+
let guard = self
96+
.shared
97+
.actors
98+
.lock()
99+
.expect("shared actor registry poisoned");
100+
guard
101+
.values()
102+
.map(|generations| {
103+
generations
104+
.values()
105+
.filter(|actor| !actor.handle.is_closed())
106+
.count()
107+
})
108+
.sum()
109+
}
110+
88111
pub fn pool_name(&self) -> &str {
89112
&self.shared.config.pool_name
90113
}
@@ -138,6 +161,40 @@ impl EnvoyHandle {
138161
rx.await.ok().flatten()
139162
}
140163

164+
pub async fn wait_actor_registered_then_stopped(&self, actor_id: &str, generation: u32) {
165+
let mut registered = false;
166+
loop {
167+
let notified = self.shared.actors_notify.notified();
168+
if self.is_stopped() {
169+
return;
170+
}
171+
172+
let actor_is_registered = {
173+
let guard = self
174+
.shared
175+
.actors
176+
.lock()
177+
.expect("shared actor registry poisoned");
178+
guard
179+
.get(actor_id)
180+
.and_then(|generations| generations.get(&generation))
181+
.is_some()
182+
};
183+
184+
if registered && !actor_is_registered {
185+
return;
186+
}
187+
if actor_is_registered {
188+
registered = true;
189+
}
190+
191+
tokio::select! {
192+
_ = notified => {}
193+
_ = self.wait_stopped() => return,
194+
}
195+
}
196+
}
197+
141198
pub fn http_request_counter(
142199
&self,
143200
actor_id: &str,
@@ -484,6 +541,35 @@ impl EnvoyHandle {
484541
/// Inject a serverless start payload into the envoy.
485542
/// The payload is a u16 LE protocol version followed by a serialized ToEnvoy message.
486543
pub async fn start_serverless_actor(&self, payload: &[u8]) -> anyhow::Result<()> {
544+
let (message, _) = decode_serverless_actor_start_payload(payload)?;
545+
546+
// Wait for envoy to be started before injecting
547+
self.started().await?;
548+
549+
tracing::debug!(
550+
data = crate::stringify::stringify_to_envoy(&message),
551+
"received serverless start"
552+
);
553+
self.shared
554+
.envoy_tx
555+
.send(ToEnvoyMessage::ConnMessage { message })
556+
.map_err(|_| anyhow::anyhow!("envoy channel closed"))?;
557+
558+
Ok(())
559+
}
560+
561+
pub fn decode_serverless_actor_start(
562+
&self,
563+
payload: &[u8],
564+
) -> anyhow::Result<ServerlessActorStart> {
565+
let (_, actor_start) = decode_serverless_actor_start_payload(payload)?;
566+
Ok(actor_start)
567+
}
568+
}
569+
570+
fn decode_serverless_actor_start_payload(
571+
payload: &[u8],
572+
) -> anyhow::Result<(protocol::ToEnvoy, ServerlessActorStart)> {
487573
use vbare::OwnedVersionedData;
488574

489575
if payload.len() < 2 {
@@ -524,21 +610,15 @@ impl EnvoyHandle {
524610
anyhow::bail!("invalid serverless payload: expected CommandStartActor");
525611
}
526612

527-
// Wait for envoy to be started before injecting
528-
self.started().await?;
529-
530-
tracing::debug!(
531-
data = crate::stringify::stringify_to_envoy(&message),
532-
"received serverless start"
533-
);
534-
self.shared
535-
.envoy_tx
536-
.send(ToEnvoyMessage::ConnMessage { message })
537-
.map_err(|_| anyhow::anyhow!("envoy channel closed"))?;
613+
let actor_start = ServerlessActorStart {
614+
actor_id: commands[0].checkpoint.actor_id.clone(),
615+
generation: commands[0].checkpoint.generation,
616+
};
538617

539-
Ok(())
618+
Ok((message, actor_start))
540619
}
541620

621+
impl EnvoyHandle {
542622
async fn send_kv_request(
543623
&self,
544624
actor_id: String,

engine/sdks/rust/envoy-client/src/sqlite.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,7 @@ mod tests {
463463
envoy_key: "test-envoy".to_string(),
464464
envoy_tx,
465465
actors: Arc::new(std::sync::Mutex::new(HashMap::new())),
466+
actors_notify: Arc::new(tokio::sync::Notify::new()),
466467
live_tunnel_requests: Arc::new(std::sync::Mutex::new(HashMap::new())),
467468
pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::new())),
468469
ws_tx: Arc::new(tokio::sync::Mutex::new(

examples/kitchen-sink/src/server.ts

Lines changed: 125 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)