Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions engine/sdks/rust/envoy-client/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,75 @@ use crate::utils::{
BufferMap, id_to_str, spawn_detached, wrapping_add_u16, wrapping_lte_u16, wrapping_sub_u16,
};

impl std::fmt::Debug for ToActor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ToActor::Intent { intent, error } => f
.debug_struct("Intent")
.field("intent", intent)
.field("error", error)
.finish(),
ToActor::Stop {
command_idx,
reason,
} => f
.debug_struct("Stop")
.field("command_idx", command_idx)
.field("reason", reason)
.finish(),
ToActor::Lost => write!(f, "Lost"),
ToActor::SetAlarm { alarm_ts, .. } => f
.debug_struct("SetAlarm")
.field("alarm_ts", alarm_ts)
.finish_non_exhaustive(),
ToActor::ReqStart { message_id, req } => f
.debug_struct("ReqStart")
.field("message_id", message_id)
.field("req", req)
.finish(),
ToActor::ReqChunk { message_id, chunk } => f
.debug_struct("ReqChunk")
.field("message_id", message_id)
.field("chunk", chunk)
.finish(),
ToActor::ReqAbort { message_id } => f
.debug_struct("ReqAbort")
.field("message_id", message_id)
.finish(),
ToActor::WsOpen {
message_id,
path,
headers,
} => f
.debug_struct("WsOpen")
.field("message_id", message_id)
.field("path", path)
.field("headers", headers)
.finish(),
ToActor::WsMsg { message_id, msg } => f
.debug_struct("WsMsg")
.field("message_id", message_id)
.field("msg", msg)
.finish(),
ToActor::WsClose { message_id, close } => f
.debug_struct("WsClose")
.field("message_id", message_id)
.field("close", close)
.finish(),
ToActor::HwsAck {
gateway_id,
request_id,
envoy_message_index,
} => f
.debug_struct("HwsAck")
.field("gateway_id", gateway_id)
.field("request_id", request_id)
.field("envoy_message_index", envoy_message_index)
.finish(),
}
}
}

pub enum ToActor {
Intent {
intent: protocol::ActorIntent,
Expand Down Expand Up @@ -263,6 +332,8 @@ async fn actor_inner(
break;
};

tracing::trace!(?msg, "received to actor message from engine");

match msg {
ToActor::Intent { intent, error } => {
send_event(
Expand Down
82 changes: 82 additions & 0 deletions engine/sdks/rust/envoy-client/src/envoy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,84 @@ pub enum BufferedActorMessage {
},
}

impl std::fmt::Debug for ToEnvoyMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ToEnvoyMessage::ConnMessage { message } => f
.debug_struct("ConnMessage")
.field("message", message)
.finish(),
ToEnvoyMessage::ConnClose { evict } => {
f.debug_struct("ConnClose").field("evict", evict).finish()
}
ToEnvoyMessage::SendEvents { events } => f
.debug_struct("SendEvents")
.field("events", events)
.finish(),
ToEnvoyMessage::KvRequest { actor_id, data, .. } => f
.debug_struct("KvRequest")
.field("actor_id", actor_id)
.field("data", data)
.finish_non_exhaustive(),
ToEnvoyMessage::SqliteRequest { request, .. } => f
.debug_struct("SqliteRequest")
.field("request", request)
.finish_non_exhaustive(),
ToEnvoyMessage::RemoteSqliteRequest { request, .. } => f
.debug_struct("RemoteSqliteRequest")
.field("request", request)
.finish_non_exhaustive(),
ToEnvoyMessage::BufferTunnelMsg { msg } => {
f.debug_struct("BufferTunnelMsg").field("msg", msg).finish()
}
ToEnvoyMessage::ActorIntent {
actor_id,
generation,
intent,
error,
} => f
.debug_struct("ActorIntent")
.field("actor_id", actor_id)
.field("generation", generation)
.field("intent", intent)
.field("error", error)
.finish(),
ToEnvoyMessage::SetAlarm {
actor_id,
generation,
alarm_ts,
..
} => f
.debug_struct("SetAlarm")
.field("actor_id", actor_id)
.field("generation", generation)
.field("alarm_ts", alarm_ts)
.finish_non_exhaustive(),
ToEnvoyMessage::HwsAck {
gateway_id,
request_id,
envoy_message_index,
} => f
.debug_struct("HwsAck")
.field("gateway_id", gateway_id)
.field("request_id", request_id)
.field("envoy_message_index", envoy_message_index)
.finish(),
ToEnvoyMessage::GetActor {
actor_id,
generation,
..
} => f
.debug_struct("GetActor")
.field("actor_id", actor_id)
.field("generation", generation)
.finish_non_exhaustive(),
ToEnvoyMessage::Shutdown => write!(f, "Shutdown"),
ToEnvoyMessage::Stop => write!(f, "Stop"),
}
}
}

pub enum ToEnvoyMessage {
ConnMessage {
message: protocol::ToEnvoy,
Expand Down Expand Up @@ -354,6 +432,8 @@ async fn envoy_loop(
msg = rx.recv() => {
let Some(msg) = msg else { break };

tracing::trace!(?msg, "received to envoy message");

match msg {
ToEnvoyMessage::ConnMessage { message } => {
lost_timeout = handle_conn_message(&mut ctx, &start_tx, lost_timeout, message).await;
Expand All @@ -364,9 +444,11 @@ async fn envoy_loop(
if evict { break; }
}
ToEnvoyMessage::SendEvents { events } => {
tracing::trace!(?events, "handling send events");
handle_send_events(&mut ctx, events).await;
}
ToEnvoyMessage::KvRequest { actor_id, data, response_tx } => {
tracing::trace!(?actor_id, ?data, "handling kv request");
handle_kv_request(&mut ctx, actor_id, data, response_tx).await;
}
ToEnvoyMessage::SqliteRequest { request, response_tx } => {
Expand Down
2 changes: 1 addition & 1 deletion engine/sdks/rust/envoy-client/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::envoy::EnvoyContext;
use crate::kv::KV_EXPIRE_MS;
use crate::utils::{EnvoyShutdownError, RemoteSqliteIndeterminateResultError};

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum SqliteRequest {
GetPages(protocol::SqliteGetPagesRequest),
Commit(protocol::SqliteCommitRequest),
Expand Down
46 changes: 44 additions & 2 deletions rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,37 @@ pub enum DispatchCommand {
},
}

impl std::fmt::Debug for DispatchCommand {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Action { name, .. } => f
.debug_struct("Action")
.field("name", name)
.finish_non_exhaustive(),
Self::QueueSend {
name,
wait,
timeout_ms,
..
} => f
.debug_struct("QueueSend")
.field("name", name)
.field("wait", wait)
.field("timeout_ms", timeout_ms)
.finish_non_exhaustive(),
Self::Http { .. } => f.debug_struct("Http").finish_non_exhaustive(),
Self::OpenWebSocket { .. } => f.debug_struct("OpenWebSocket").finish_non_exhaustive(),
Self::WorkflowHistory { .. } => {
f.debug_struct("WorkflowHistory").finish_non_exhaustive()
}
Self::WorkflowReplay { entry_id, .. } => f
.debug_struct("WorkflowReplay")
.field("entry_id", entry_id)
.finish_non_exhaustive(),
}
}
}

impl DispatchCommand {
fn kind(&self) -> &'static str {
match self {
Expand Down Expand Up @@ -566,6 +597,7 @@ impl ActorTask {
lifecycle_command = self.lifecycle_inbox.recv() => {
match lifecycle_command {
Some(command) => {
tracing::trace!(command_kind = command.kind(), reason = ?command.stop_reason(), "received lifecycle command");
if let Some(exit) = self.handle_lifecycle(command).await {
return exit;
}
Expand All @@ -581,7 +613,10 @@ impl ActorTask {
}
lifecycle_event = self.lifecycle_events.recv() => {
match lifecycle_event {
Some(event) => self.handle_event(event).await,
Some(event) => {
tracing::trace!(?event, "received lifecycle event");
self.handle_event(event).await
}
None => {
self.log_closed_channel(
"lifecycle_events",
Expand All @@ -604,7 +639,10 @@ impl ActorTask {
}
dispatch_command = self.dispatch_inbox.recv(), if self.accepting_dispatch() => {
match dispatch_command {
Some(command) => self.handle_dispatch(command).await,
Some(command) => {
tracing::trace!(?command, "received dispatch command");
self.handle_dispatch(command).await
}
None => {
self.log_closed_channel(
"dispatch_inbox",
Expand All @@ -615,17 +653,21 @@ impl ActorTask {
}
}
outcome = Self::wait_for_run_handle(self.run_handle.as_mut()), if self.run_handle.is_some() => {
tracing::trace!("checking user runtime run handle outcome");
if let Some(exit) = self.handle_run_handle_outcome(outcome) {
return exit;
}
}
_ = Self::state_save_tick(self.state_save_deadline), if self.state_save_timer_active() => {
tracing::trace!("state save tick");
self.on_state_save_tick().await;
}
_ = Self::inspector_serialize_state_tick(self.inspector_serialize_state_deadline), if self.inspector_serialize_timer_active() => {
tracing::trace!("serialize state save tick");
self.on_inspector_serialize_state_tick().await;
}
_ = Self::sleep_tick(self.sleep_deadline), if self.sleep_timer_active() => {
tracing::trace!("sleep tick");
self.on_sleep_tick().await;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ impl ActorContext {
self.inner.wait_until(async move {
let _region = region;
if let Err(error) = promise.await {
tracing::warn!(?error, "actor keep_awake promise rejected");
tracing::debug!(?error, "actor keep_awake promise rejected");
}
});
Ok(())
Expand Down Expand Up @@ -614,7 +614,7 @@ impl ActorContext {
self.shared
.register_task(Box::pin(async move {
if let Err(error) = promise.await {
tracing::warn!(?error, "actor keep_awake promise rejected");
tracing::debug!(?error, "actor keep_awake promise rejected");
}
}))
.map_err(napi_anyhow_error)
Expand Down
31 changes: 20 additions & 11 deletions rivetkit-typescript/packages/rivetkit-napi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,26 @@ fn anyhow_to_bridge_rivet_error_payload(error: anyhow::Error) -> serde_json::Val
"public": public_,
"statusCode": status_code,
});
tracing::error!(
group = error.group(),
code = error.code(),
message = %error.message(),
metadata = ?error.metadata(),
error_chain = ?error_chain,
has_metadata = error.metadata().is_some(),
?public_,
?status_code,
"encoded structured bridge error"
);
if public_.unwrap_or(false) {
tracing::debug!(
group = error.group(),
code = error.code(),
message = %error.message(),
metadata = ?error.metadata(),
?status_code,
"user-facing bridge error"
);
} else {
tracing::error!(
group = error.group(),
code = error.code(),
message = %error.message(),
metadata = ?error.metadata(),
error_chain = ?error_chain,
?status_code,
"internal bridge error"
);
}
payload
}

Expand Down
Loading