Skip to content

Commit 54026cd

Browse files
committed
chore(rivetkit): add child spans for logging
1 parent 2a3027f commit 54026cd

36 files changed

Lines changed: 1324 additions & 733 deletions

File tree

engine/sdks/rust/envoy-client/src/actor.rs

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use tokio::sync::mpsc;
99
use tokio::sync::oneshot;
1010
use tokio::sync::oneshot::error::TryRecvError;
1111
use tokio::task::{JoinError, JoinSet};
12+
use tracing::Instrument;
1213

1314
use crate::config::{HttpRequest, HttpResponse, WebSocketMessage};
1415
use crate::connection::ws_send;
@@ -144,6 +145,14 @@ pub fn create_actor(
144145
(tx, active_http_request_count)
145146
}
146147

148+
#[tracing::instrument(
149+
skip_all,
150+
fields(
151+
actor_id = %actor_id,
152+
generation = generation,
153+
actor_key = %config.key.as_deref().unwrap_or(""),
154+
),
155+
)]
147156
async fn actor_inner(
148157
shared: Arc<SharedContext>,
149158
actor_id: String,
@@ -194,7 +203,7 @@ async fn actor_inner(
194203
.await;
195204

196205
if let Err(error) = start_result {
197-
tracing::error!(actor_id = %ctx.actor_id, ?error, "actor start failed");
206+
tracing::error!(?error, "actor start failed");
198207
send_event(
199208
&mut ctx,
200209
protocol::Event::EventActorStateUpdate(protocol::EventActorStateUpdate {
@@ -209,7 +218,7 @@ async fn actor_inner(
209218

210219
if let Some(meta_entries) = handle.take_pending_hibernation_restore(&actor_id) {
211220
if let Err(error) = handle_hws_restore(&mut ctx, &handle, meta_entries).await {
212-
tracing::error!(actor_id = %ctx.actor_id, ?error, "actor hibernation restore failed");
221+
tracing::error!(?error, "actor hibernation restore failed");
213222
send_event(
214223
&mut ctx,
215224
protocol::Event::EventActorStateUpdate(protocol::EventActorStateUpdate {
@@ -241,7 +250,7 @@ async fn actor_inner(
241250
}
242251
} => {
243252
if let Some(result) = maybe_task {
244-
handle_http_request_task_result(&ctx, result);
253+
handle_http_request_task_result(result);
245254
}
246255
}
247256
msg = async {
@@ -275,7 +284,6 @@ async fn actor_inner(
275284
} => {
276285
if pending_stop.is_some() {
277286
tracing::warn!(
278-
actor_id = %ctx.actor_id,
279287
command_idx,
280288
"ignoring duplicate stop while actor teardown is in progress"
281289
);
@@ -294,7 +302,6 @@ async fn actor_inner(
294302
ToActor::Lost => {
295303
if pending_stop.is_some() {
296304
tracing::warn!(
297-
actor_id = %ctx.actor_id,
298305
"ignoring lost signal while actor teardown is in progress"
299306
);
300307
continue;
@@ -368,7 +375,7 @@ async fn actor_inner(
368375
}
369376

370377
abort_and_join_http_request_tasks(&mut ctx, &mut http_request_tasks).await;
371-
tracing::debug!(actor_id = %ctx.actor_id, "envoy actor stopped");
378+
tracing::debug!("envoy actor stopped");
372379
}
373380

374381
fn send_event(ctx: &mut ActorContext, inner: protocol::Event) {
@@ -409,7 +416,7 @@ async fn begin_stop(
409416
.await;
410417

411418
if let Err(error) = stop_result {
412-
tracing::error!(actor_id = %ctx.actor_id, ?error, "actor stop failed");
419+
tracing::error!(?error, "actor stop failed");
413420
stop_code = protocol::StopCode::Error;
414421
if stop_message.is_none() {
415422
stop_message = Some(format!("{error:#}"));
@@ -451,7 +458,6 @@ fn finalize_stop(
451458
}
452459
Err(error) => {
453460
tracing::warn!(
454-
actor_id = %ctx.actor_id,
455461
?error,
456462
"actor stop completion handle dropped before signaling teardown result"
457463
);
@@ -467,7 +473,7 @@ fn send_stopped_event_for_result(
467473
stop_result: anyhow::Result<()>,
468474
) {
469475
if let Err(error) = stop_result {
470-
tracing::error!(actor_id = %ctx.actor_id, ?error, "actor stop completion failed");
476+
tracing::error!(?error, "actor stop completion failed");
471477
stop_code = protocol::StopCode::Error;
472478
if stop_message.is_none() {
473479
stop_message = Some(format!("{error:#}"));
@@ -541,37 +547,40 @@ fn handle_req_start(
541547
let request_id = message_id.request_id;
542548
let request_guard = ActiveHttpRequestGuard::new(ctx.active_http_request_count.clone());
543549

544-
http_request_tasks.spawn(async move {
545-
let _request_guard = request_guard;
546-
let response = shared
547-
.config
548-
.callbacks
549-
.fetch(handle_clone, actor_id, gateway_id, request_id, request)
550-
.await;
550+
http_request_tasks.spawn(
551+
async move {
552+
let _request_guard = request_guard;
553+
let response = shared
554+
.config
555+
.callbacks
556+
.fetch(handle_clone, actor_id, gateway_id, request_id, request)
557+
.await;
551558

552-
match response {
553-
Ok(response) => {
554-
send_response(&shared, gateway_id, request_id, response).await;
555-
}
556-
Err(error) => {
557-
tracing::error!(?error, "fetch failed");
559+
match response {
560+
Ok(response) => {
561+
send_response(&shared, gateway_id, request_id, response).await;
562+
}
563+
Err(error) => {
564+
tracing::error!(?error, "fetch failed");
565+
}
558566
}
559567
}
560-
});
568+
.in_current_span(),
569+
);
561570

562571
if !req.stream {
563572
ctx.pending_requests
564573
.remove(&[&message_id.gateway_id, &message_id.request_id]);
565574
}
566575
}
567576

568-
fn handle_http_request_task_result(ctx: &ActorContext, result: Result<(), JoinError>) {
577+
fn handle_http_request_task_result(result: Result<(), JoinError>) {
569578
if let Err(error) = result {
570579
if error.is_cancelled() {
571580
return;
572581
}
573582

574-
tracing::error!(actor_id = %ctx.actor_id, ?error, "http request task failed");
583+
tracing::error!(?error, "http request task failed");
575584
}
576585
}
577586

@@ -585,15 +594,14 @@ async fn abort_and_join_http_request_tasks(
585594

586595
let active_http_request_count = ctx.active_http_request_count.load();
587596
tracing::debug!(
588-
actor_id = %ctx.actor_id,
589597
active_http_request_count,
590598
"aborting in-flight http request tasks"
591599
);
592600

593601
http_request_tasks.abort_all();
594602

595603
while let Some(result) = http_request_tasks.join_next().await {
596-
handle_http_request_task_result(ctx, result);
604+
handle_http_request_task_result(result);
597605
}
598606
}
599607

@@ -633,7 +641,7 @@ fn spawn_ws_outgoing_task(
633641
request_id: protocol::RequestId,
634642
mut outgoing_rx: mpsc::UnboundedReceiver<crate::config::WsOutgoing>,
635643
) {
636-
tokio::spawn(async move {
644+
let ws_task = async move {
637645
let mut idx: u16 = 0;
638646
while let Some(msg) = outgoing_rx.recv().await {
639647
idx += 1;
@@ -681,7 +689,8 @@ fn spawn_ws_outgoing_task(
681689
}
682690
}
683691
}
684-
});
692+
};
693+
tokio::spawn(ws_task.in_current_span());
685694
}
686695

687696
async fn handle_ws_open(
@@ -896,7 +905,6 @@ async fn handle_ws_message(
896905
if wrapping_lte_u16(received_index, previous_index) {
897906
tracing::info!(
898907
request_id = id_to_str(&message_id.request_id),
899-
actor_id = %ctx.actor_id,
900908
previous_index,
901909
received_index,
902910
"received duplicate hibernating websocket message"
@@ -908,7 +916,6 @@ async fn handle_ws_message(
908916
if received_index != expected_index {
909917
tracing::warn!(
910918
request_id = id_to_str(&message_id.request_id),
911-
actor_id = %ctx.actor_id,
912919
previous_index,
913920
expected_index,
914921
received_index,
@@ -1621,6 +1628,7 @@ mod tests {
16211628
)),
16221629
protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)),
16231630
shutting_down: std::sync::atomic::AtomicBool::new(false),
1631+
stopped_tx: tokio::sync::watch::channel(true).0,
16241632
});
16251633
(shared, envoy_rx)
16261634
}

engine/sdks/rust/envoy-client/src/context.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use rivet_envoy_protocol as protocol;
77
use rivet_util::async_counter::AsyncCounter;
88
use tokio::sync::Mutex;
99
use tokio::sync::mpsc;
10+
use tokio::sync::watch;
1011

1112
use crate::actor::ToActor;
1213
use crate::config::EnvoyConfig;
@@ -29,6 +30,11 @@ pub struct SharedContext {
2930
pub ws_tx: Arc<Mutex<Option<mpsc::UnboundedSender<WsTxMessage>>>>,
3031
pub protocol_metadata: Arc<Mutex<Option<protocol::ProtocolMetadata>>>,
3132
pub shutting_down: AtomicBool,
33+
// Latched signal fired by `envoy_loop` after its cleanup block completes.
34+
// Waiters observing `true` are guaranteed that the loop has exited and
35+
// every pending KV/SQLite request has been resolved (with `EnvoyShutdownError`
36+
// if it didn't complete naturally).
37+
pub stopped_tx: watch::Sender<bool>,
3238
}
3339

3440
#[derive(Debug)]

engine/sdks/rust/envoy-client/src/envoy.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ pub fn start_envoy_sync(config: EnvoyConfig) -> EnvoyHandle {
252252
fn start_envoy_sync_inner(config: EnvoyConfig) -> EnvoyHandle {
253253
let (envoy_tx, envoy_rx) = mpsc::unbounded_channel::<ToEnvoyMessage>();
254254
let (start_tx, start_rx) = tokio::sync::watch::channel(());
255+
let (stopped_tx, _stopped_rx) = tokio::sync::watch::channel(false);
255256

256257
let envoy_key = uuid::Uuid::new_v4().to_string();
257258
let shared = Arc::new(SharedContext {
@@ -264,20 +265,14 @@ fn start_envoy_sync_inner(config: EnvoyConfig) -> EnvoyHandle {
264265
ws_tx: Arc::new(tokio::sync::Mutex::new(None)),
265266
protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)),
266267
shutting_down: std::sync::atomic::AtomicBool::new(false),
268+
stopped_tx,
267269
});
268270

269271
let handle = EnvoyHandle {
270272
shared: shared.clone(),
271273
started_rx: start_rx,
272274
};
273275

274-
// Start signal handler
275-
let handle2 = handle.clone();
276-
tokio::spawn(async move {
277-
let _ = tokio::signal::ctrl_c().await;
278-
handle2.shutdown(false);
279-
});
280-
281276
start_connection(shared.clone());
282277

283278
let ctx = EnvoyContext {
@@ -459,6 +454,11 @@ async fn envoy_loop(
459454
tracing::info!("envoy stopped");
460455

461456
ctx.shared.config.callbacks.on_shutdown();
457+
458+
// Latched signal: waiters on `EnvoyHandle::wait_stopped` observe this and
459+
// any future callers of `wait_stopped` resolve immediately because watch
460+
// retains the last value.
461+
let _ = ctx.shared.stopped_tx.send(true);
462462
}
463463

464464
async fn handle_conn_message(

engine/sdks/rust/envoy-client/src/events.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ mod tests {
164164
)),
165165
protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)),
166166
shutting_down: std::sync::atomic::AtomicBool::new(false),
167+
stopped_tx: tokio::sync::watch::channel(true).0,
167168
});
168169
let handle = EnvoyHandle {
169170
shared: shared.clone(),

engine/sdks/rust/envoy-client/src/handle.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,29 @@ impl EnvoyHandle {
3636
}
3737
}
3838

39+
/// Resolves when the envoy loop has finished its cleanup block.
40+
///
41+
/// Returning does NOT imply successful delivery of pending KV/SQLite/tunnel
42+
/// requests. The cleanup block errors out every outstanding request with
43+
/// `"envoy shutting down"`. Callers needing durability must wait on individual
44+
/// request acks before invoking shutdown.
45+
///
46+
/// Latched: safe to call before, during, or after the envoy loop exits.
47+
/// A waiter arriving after the loop already exited resolves immediately.
48+
pub async fn wait_stopped(&self) {
49+
let mut rx = self.shared.stopped_tx.subscribe();
50+
if *rx.borrow_and_update() {
51+
return;
52+
}
53+
let _ = rx.changed().await;
54+
}
55+
56+
/// Convenience: signal shutdown then await `wait_stopped`.
57+
pub async fn shutdown_and_wait(&self, immediate: bool) {
58+
self.shutdown(immediate);
59+
self.wait_stopped().await;
60+
}
61+
3962
pub async fn get_protocol_metadata(&self) -> Option<protocol::ProtocolMetadata> {
4063
self.shared.protocol_metadata.lock().await.clone()
4164
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"code": "shut_down",
3+
"group": "registry",
4+
"message": "Registry is shut down."
5+
}

rivetkit-rust/packages/rivetkit-core/CLAUDE.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@
2222
- Flush the actor-connect `WebSocketSender` after queuing a setup `Error` frame and before closing so the envoy writer handles the error before the close terminates the connection.
2323
- Bound actor-connect websocket setup at the registry boundary as well as inside the actor task. The HTTP upgrade can complete before `connection_open` replies, so a missing reply must still close the socket instead of idling until the client test timeout.
2424

25+
## Run modes
26+
27+
- Two run modes exist for `CoreRegistry`. **Persistent envoy**: `serve_with_config(...)` starts one outbound envoy via `start_envoy` and holds it for the process lifetime; used by standalone Rust binaries and TS `registry.start()`. **Serverless request**: `into_serverless_runtime(...)` returns a `CoreServerlessRuntime` whose `handle_request(...)` lazily starts an envoy on first request via `ensure_envoy(...)` and caches it; used by Node/Bun/Deno HTTP hosts and platform fetch handlers. Both modes end up holding a long-lived `EnvoyHandle`.
28+
- Shutdown is a property of the host's `CoreRegistry` handle, not of whichever entrypoint ran first. Route process-level shutdown (SIGINT/SIGTERM) through a single `CoreRegistry::shutdown()` that trips one shared cancel token observed by `serve_with_config` and calls `CoreServerlessRuntime::shutdown()` on the cached runtime. Never attach the shutdown signal to `serve_with_config`'s config parameter — that misses Mode B entirely.
29+
- `rivetkit-core` and `rivet-envoy-client` must not install process signal handlers (no `tokio::signal::ctrl_c()` in library code). `tokio::signal::ctrl_c()` calls `sigaction(SIGINT, ...)` at the POSIX level and prevents Node from exiting when rivetkit is embedded via NAPI. Signal policy belongs to the host binary or the TS registry layer.
30+
- Per-request `CancellationToken` on `handle_serverless_request` cancels a single in-flight request and does not tear down the cached envoy. Do not overload it with registry shutdown.
31+
2532
## Test harness
2633

2734
- `tests/modules/task.rs` tests that install a tracing subscriber with `set_default(...)` must take `test_hook_lock()` first, or full `cargo test` parallelism makes the log-capture assertions flaky.

rivetkit-rust/packages/rivetkit-core/examples/counter.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,19 @@ fn counter_factory() -> ActorFactory {
9999
async fn main() -> Result<()> {
100100
let mut registry = CoreRegistry::new();
101101
registry.register("counter", counter_factory());
102-
tokio::select! {
103-
res = registry.serve() => res,
104-
_ = tokio::signal::ctrl_c() => Ok(()),
102+
let token = tokio_util::sync::CancellationToken::new();
103+
let serve = tokio::spawn({
104+
let token = token.clone();
105+
async move { registry.serve(token).await }
106+
});
107+
match tokio::signal::ctrl_c().await {
108+
Ok(()) => {}
109+
Err(err) => tracing::warn!(?err, "ctrl_c install failed; cancelling anyway"),
110+
}
111+
token.cancel();
112+
match serve.await {
113+
Ok(Ok(())) => Ok(()),
114+
Ok(Err(err)) => Err(err),
115+
Err(join_err) => Err(join_err.into()),
105116
}
106117
}

0 commit comments

Comments
 (0)