Skip to content

Commit 5fddabc

Browse files
committed
chore(envoy-client): add envoy key tracing context
1 parent 6b5aa7b commit 5fddabc

5 files changed

Lines changed: 61 additions & 31 deletions

File tree

engine/sdks/rust/envoy-client/src/actor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ pub fn create_actor(
146146
#[tracing::instrument(
147147
skip_all,
148148
fields(
149+
envoy_key = %shared.envoy_key,
149150
actor_id = %actor_id,
150151
generation = generation,
151152
actor_key = %config.key.as_deref().unwrap_or(""),

engine/sdks/rust/envoy-client/src/connection/native.rs

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use futures_util::{SinkExt, StreamExt};
55
use rivet_envoy_protocol as protocol;
66
use tokio::sync::mpsc;
77
use tokio_tungstenite::tungstenite;
8+
use tracing::Instrument;
89
use vbare::OwnedVersionedData;
910

1011
use crate::context::{SharedContext, WsTxMessage};
@@ -14,7 +15,8 @@ use crate::utils::{BackoffOptions, calculate_backoff, parse_ws_close_reason};
1415
const STABLE_CONNECTION_MS: u64 = 60_000;
1516

1617
pub fn start_connection(shared: Arc<SharedContext>) {
17-
tokio::spawn(connection_loop(shared));
18+
let span = tracing::debug_span!("envoy_connection", envoy_key = %shared.envoy_key);
19+
tokio::spawn(connection_loop(shared).instrument(span));
1820
}
1921

2022
async fn connection_loop(shared: Arc<SharedContext>) {
@@ -118,31 +120,38 @@ async fn single_connection(
118120

119121
// Spawn write task
120122
let shared2 = shared.clone();
121-
let write_handle = tokio::spawn(async move {
122-
super::send_initial_metadata(&shared2).await;
123-
124-
while let Some(msg) = ws_rx.recv().await {
125-
match msg {
126-
WsTxMessage::Send(data) => {
127-
if let Err(e) = write.send(tungstenite::Message::Binary(data.into())).await {
128-
tracing::error!(?e, "failed to send ws message");
123+
let write_span = tracing::debug_span!("envoy_ws_write", envoy_key = %shared2.envoy_key);
124+
let write_handle = tokio::spawn(
125+
async move {
126+
super::send_initial_metadata(&shared2).await;
127+
128+
while let Some(msg) = ws_rx.recv().await {
129+
match msg {
130+
WsTxMessage::Send(data) => {
131+
let result = write
132+
.send(tungstenite::Message::Binary(data.into()))
133+
.await;
134+
if let Err(e) = result {
135+
tracing::error!(?e, "failed to send ws message");
136+
break;
137+
}
138+
}
139+
WsTxMessage::Close => {
140+
let _ = write
141+
.send(tungstenite::Message::Close(Some(
142+
tungstenite::protocol::CloseFrame {
143+
code: tungstenite::protocol::frame::coding::CloseCode::Normal,
144+
reason: "envoy.shutdown".into(),
145+
},
146+
)))
147+
.await;
129148
break;
130149
}
131150
}
132-
WsTxMessage::Close => {
133-
let _ = write
134-
.send(tungstenite::Message::Close(Some(
135-
tungstenite::protocol::CloseFrame {
136-
code: tungstenite::protocol::frame::coding::CloseCode::Normal,
137-
reason: "envoy.shutdown".into(),
138-
},
139-
)))
140-
.await;
141-
break;
142-
}
143151
}
144152
}
145-
});
153+
.instrument(write_span),
154+
);
146155

147156
let mut result = None;
148157

engine/sdks/rust/envoy-client/src/connection/wasm.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod imp {
77
use js_sys::{Array, Function, Promise, Reflect, Uint8Array};
88
use rivet_envoy_protocol as protocol;
99
use tokio::sync::mpsc;
10+
use tracing::Instrument;
1011
use vbare::OwnedVersionedData;
1112
use wasm_bindgen::{JsCast, JsValue, closure::Closure};
1213
use wasm_bindgen_futures::JsFuture;
@@ -28,7 +29,8 @@ mod imp {
2829
}
2930

3031
pub fn start_connection(shared: Arc<SharedContext>) {
31-
wasm_bindgen_futures::spawn_local(connection_loop(shared));
32+
let span = tracing::debug_span!("envoy_connection", envoy_key = %shared.envoy_key);
33+
wasm_bindgen_futures::spawn_local(connection_loop(shared).instrument(span));
3234
}
3335

3436
async fn connection_loop(shared: Arc<SharedContext>) {
@@ -102,10 +104,11 @@ mod imp {
102104

103105
let onmessage = {
104106
let event_tx = event_tx.clone();
107+
let envoy_key = shared.envoy_key.clone();
105108
Closure::<dyn FnMut(MessageEvent)>::wrap(Box::new(move |event| {
106109
let data = event.data();
107110
let Some(bytes) = decode_message_data(data) else {
108-
tracing::warn!("received non-binary websocket message");
111+
tracing::warn!(%envoy_key, "received non-binary websocket message");
109112
return;
110113
};
111114
let _ = event_tx.send(ConnectionEvent::Message(bytes));
@@ -168,6 +171,7 @@ mod imp {
168171
let shared = shared.clone();
169172
let ws = ws.clone();
170173
let event_tx = event_tx.clone();
174+
let write_span = tracing::debug_span!("envoy_ws_write", envoy_key = %shared.envoy_key);
171175
async move {
172176
super::super::send_initial_metadata(&shared).await;
173177

@@ -189,6 +193,7 @@ mod imp {
189193
}
190194
}
191195
}
196+
.instrument(write_span)
192197
});
193198

194199
let mut result = None;

engine/sdks/rust/envoy-client/src/envoy.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::async_counter::AsyncCounter;
1111
use rivet_envoy_protocol as protocol;
1212
use tokio::sync::mpsc;
1313
use tokio::sync::oneshot;
14+
use tracing::Instrument;
1415

1516
use crate::actor::ToActor;
1617
use crate::commands::{ACK_COMMANDS_INTERVAL_MS, handle_commands, send_command_ack};
@@ -332,9 +333,9 @@ fn start_envoy_sync_inner(config: EnvoyConfig) -> EnvoyHandle {
332333
processed_command_idx: HashMap::new(),
333334
};
334335

335-
tracing::info!("starting envoy");
336-
337-
spawn_detached(envoy_loop(ctx, envoy_rx, start_tx));
336+
tracing::info!(envoy_key = %shared.envoy_key, "starting envoy");
337+
let span = tracing::info_span!("envoy_client", envoy_key = %shared.envoy_key);
338+
spawn_detached(envoy_loop(ctx, envoy_rx, start_tx).instrument(span));
338339

339340
handle
340341
}
@@ -603,9 +604,17 @@ async fn handle_shutdown(ctx: &mut EnvoyContext) {
603604
.collect();
604605

605606
let envoy_tx = ctx.shared.envoy_tx.clone();
606-
spawn_detached(async move {
607-
futures_util::future::join_all(actor_handles.iter().map(|h| h.closed())).await;
608-
tracing::debug!("all actors stopped during graceful shutdown");
609-
let _ = envoy_tx.send(ToEnvoyMessage::Stop);
610-
});
607+
let shutdown_span = tracing::debug_span!(
608+
parent: tracing::Span::current(),
609+
"envoy_graceful_shutdown",
610+
envoy_key = %ctx.shared.envoy_key,
611+
);
612+
spawn_detached(
613+
async move {
614+
futures_util::future::join_all(actor_handles.iter().map(|h| h.closed())).await;
615+
tracing::debug!("all actors stopped during graceful shutdown");
616+
let _ = envoy_tx.send(ToEnvoyMessage::Stop);
617+
}
618+
.instrument(shutdown_span),
619+
);
611620
}

engine/sdks/rust/envoy-client/src/handle.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,12 +541,18 @@ impl EnvoyHandle {
541541
/// Inject a serverless start payload into the envoy.
542542
/// The payload is a u16 LE protocol version followed by a serialized ToEnvoy message.
543543
pub async fn start_serverless_actor(&self, payload: &[u8]) -> anyhow::Result<()> {
544+
tracing::debug!(
545+
envoy_key = %self.shared.envoy_key,
546+
payload_len = payload.len(),
547+
"received serverless start request"
548+
);
544549
let (message, _) = decode_serverless_actor_start_payload(payload)?;
545550

546551
// Wait for envoy to be started before injecting
547552
self.started().await?;
548553

549554
tracing::debug!(
555+
envoy_key = %self.shared.envoy_key,
550556
data = crate::stringify::stringify_to_envoy(&message),
551557
"received serverless start"
552558
);

0 commit comments

Comments
 (0)