Skip to content

Commit 4ade2d1

Browse files
committed
fix(pegboard): restore hibernating requests on serverful start
1 parent 9bf8eb7 commit 4ade2d1

5 files changed

Lines changed: 31 additions & 4 deletions

File tree

engine/packages/pegboard-envoy/src/conn.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use scc::HashMap;
2121
use universaldb::prelude::*;
2222
use vbare::OwnedVersionedData;
2323

24-
use crate::{actor_lifecycle, errors, metrics, utils::UrlData};
24+
use crate::{actor_lifecycle, errors, hibernating_requests, metrics, utils::UrlData};
2525

2626
pub type RemoteSqliteExecutors =
2727
HashMap<(String, u64), Arc<tokio::sync::OnceCell<NativeDatabaseHandle>>>;
@@ -333,6 +333,7 @@ pub async fn init_conn(
333333
if !missed_commands.is_empty() {
334334
let replay_result: Result<()> = async {
335335
for cmd_wrapper in &mut missed_commands {
336+
hibernating_requests::refresh_command_wrapper(ctx, cmd_wrapper).await?;
336337
if let protocol::Command::CommandStopActor(_) = cmd_wrapper.inner {
337338
actor_lifecycle::stop_actor(&conn, &cmd_wrapper.checkpoint).await?;
338339
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use anyhow::Context;
2+
use gas::prelude::*;
3+
use rivet_envoy_protocol as protocol;
4+
5+
pub(crate) async fn refresh_command_wrapper(
6+
ctx: &StandaloneCtx,
7+
command_wrapper: &mut protocol::CommandWrapper,
8+
) -> Result<()> {
9+
let protocol::Command::CommandStartActor(start) = &mut command_wrapper.inner else {
10+
return Ok(());
11+
};
12+
13+
let actor_id =
14+
Id::parse(&command_wrapper.checkpoint.actor_id).context("invalid command actor id")?;
15+
start.hibernating_requests = ctx
16+
.op(pegboard::ops::actor::hibernating_request::list::Input { actor_id })
17+
.await?
18+
.into_iter()
19+
.map(|request| protocol::HibernatingRequest {
20+
gateway_id: request.gateway_id,
21+
request_id: request.request_id,
22+
})
23+
.collect();
24+
25+
Ok(())
26+
}

engine/packages/pegboard-envoy/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ mod actor_event_demuxer;
1515
mod actor_lifecycle;
1616
mod conn;
1717
mod errors;
18+
mod hibernating_requests;
1819
mod metrics;
1920
mod ping_task;
2021
pub mod sqlite_runtime;

engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use universalpubsub as ups;
99
use universalpubsub::{NextOutput, PublishOpts, Subscriber};
1010
use vbare::OwnedVersionedData;
1111

12-
use crate::{LifecycleResult, actor_lifecycle, conn::Conn, metrics};
12+
use crate::{LifecycleResult, actor_lifecycle, conn::Conn, hibernating_requests, metrics};
1313

1414
#[tracing::instrument(name="tunnel_to_ws_task", skip_all, fields(ray_id=?ctx.ray_id(), req_id=?ctx.req_id(), envoy_key=%conn.envoy_key, protocol_version=%conn.protocol_version))]
1515
pub async fn task(
@@ -126,6 +126,7 @@ async fn handle_message(
126126
protocol::ToEnvoyConn::ToEnvoyCommands(mut command_wrappers) => {
127127
// TODO: Parallelize
128128
for command_wrapper in &mut command_wrappers {
129+
hibernating_requests::refresh_command_wrapper(ctx, command_wrapper).await?;
129130
if let protocol::Command::CommandStopActor(_) = &command_wrapper.inner {
130131
actor_lifecycle::stop_actor(conn, &command_wrapper.checkpoint).await?;
131132
}

engine/packages/pegboard/src/workflows/actor2/runtime.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,8 +368,6 @@ pub async fn send_outbound(ctx: &ActivityCtx, input: &SendOutboundInput) -> Resu
368368
.as_ref()
369369
.and_then(|x| BASE64_STANDARD.decode(x).ok()),
370370
},
371-
// Empty because request ids are ephemeral. This is intercepted by guard and
372-
// populated before it reaches the runner
373371
hibernating_requests: Vec::new(),
374372
preloaded_kv: None,
375373
});

0 commit comments

Comments
 (0)