Skip to content

Commit 415545d

Browse files
committed
fix(envoy-client): complete command and request lifecycles
1 parent c546b5e commit 415545d

3 files changed

Lines changed: 37 additions & 6 deletions

File tree

engine/sdks/rust/envoy-client/src/actor.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,7 @@ fn handle_req_start(
558558
}
559559
Err(error) => {
560560
tracing::error!(?error, "fetch failed");
561+
send_fetch_error_response(&shared, gateway_id, request_id).await;
561562
}
562563
}
563564
}
@@ -1343,6 +1344,40 @@ async fn send_response(
13431344
}
13441345
}
13451346

1347+
async fn send_fetch_error_response(
1348+
shared: &SharedContext,
1349+
gateway_id: protocol::GatewayId,
1350+
request_id: protocol::RequestId,
1351+
) {
1352+
let body = br#"{"code":"envoy_fetch_failed","message":"actor fetch failed"}"#.to_vec();
1353+
let mut headers = HashableMap::new();
1354+
headers.insert("content-length".to_string(), body.len().to_string());
1355+
headers.insert(
1356+
"x-rivet-error".to_string(),
1357+
"envoy.fetch_failed".to_string(),
1358+
);
1359+
1360+
ws_send(
1361+
shared,
1362+
protocol::ToRivet::ToRivetTunnelMessage(protocol::ToRivetTunnelMessage {
1363+
message_id: protocol::MessageId {
1364+
gateway_id,
1365+
request_id,
1366+
message_index: 0,
1367+
},
1368+
message_kind: protocol::ToRivetTunnelMessageKind::ToRivetResponseStart(
1369+
protocol::ToRivetResponseStart {
1370+
status: 500,
1371+
headers,
1372+
body: Some(body),
1373+
stream: false,
1374+
},
1375+
),
1376+
}),
1377+
)
1378+
.await;
1379+
}
1380+
13461381
#[cfg(test)]
13471382
mod tests {
13481383
use std::collections::HashMap;

engine/sdks/rust/envoy-client/src/envoy.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,7 @@ async fn handle_conn_message(
485485
}
486486
protocol::ToEnvoy::ToEnvoyCommands(commands) => {
487487
handle_commands(ctx, commands).await;
488+
send_command_ack(ctx).await;
488489
}
489490
protocol::ToEnvoy::ToEnvoyAckEvents(ack) => {
490491
handle_ack_events(ctx, ack);

engine/sdks/rust/envoy-client/src/events.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@ pub async fn handle_send_events(ctx: &mut EnvoyContext, events: Vec<protocol::Ev
2323
state_update.state,
2424
protocol::ActorState::ActorStateStopped(_)
2525
) {
26-
// If the actor is being stopped by rivet, we don't need the entry anymore
27-
if entry.received_stop {
28-
remove_after_stop = true;
29-
}
26+
remove_after_stop = true;
3027
}
3128
}
3229
}
@@ -50,8 +47,6 @@ pub fn handle_ack_events(ctx: &mut EnvoyContext, ack: protocol::ToEnvoyAckEvents
5047
}
5148
}
5249

53-
// TODO: If the envoy disconnects, actor stops, then envoy reconnects, we will send the stop event but there
54-
// is no mechanism to remove the actor entry afterwards. We only remove the actor entry if rivet stops the actor.
5550
pub async fn resend_unacknowledged_events(ctx: &EnvoyContext) {
5651
let mut events: Vec<protocol::EventWrapper> = Vec::new();
5752

0 commit comments

Comments
 (0)