Skip to content

Commit b0ace70

Browse files
committed
fix(pegboard): restore hibernating request ids
1 parent 07da44c commit b0ace70

2 files changed

Lines changed: 82 additions & 1 deletion

File tree

  • engine/packages/pegboard/src/workflows/actor2
  • rivetkit-rust/packages/rivetkit-core/tests

engine/packages/pegboard/src/workflows/actor2/runtime.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,17 @@ pub async fn send_outbound(ctx: &ActivityCtx, input: &SendOutboundInput) -> Resu
358358
.await?;
359359
}
360360
Allocation::Serverful { envoy_key } => {
361+
let hibernating_requests = ctx
362+
.op(crate::ops::actor::hibernating_request::list::Input {
363+
actor_id: state.actor_id,
364+
})
365+
.await?
366+
.into_iter()
367+
.map(|request| protocol::HibernatingRequest {
368+
gateway_id: request.gateway_id,
369+
request_id: request.request_id,
370+
})
371+
.collect();
361372
let command = protocol::Command::CommandStartActor(protocol::CommandStartActor {
362373
config: protocol::ActorConfig {
363374
name: state.name.clone(),
@@ -368,7 +379,7 @@ pub async fn send_outbound(ctx: &ActivityCtx, input: &SendOutboundInput) -> Resu
368379
.as_ref()
369380
.and_then(|x| BASE64_STANDARD.decode(x).ok()),
370381
},
371-
hibernating_requests: Vec::new(),
382+
hibernating_requests,
372383
preloaded_kv: None,
373384
});
374385

rivetkit-rust/packages/rivetkit-core/tests/task.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1208,6 +1208,76 @@ mod moved_tests {
12081208
assert_eq!(remaining_conns[0].id(), hibernating_conn.id());
12091209
}
12101210

1211+
#[tokio::test]
1212+
async fn wake_start_clears_previous_sleep_request() {
1213+
let kv = new_in_memory();
1214+
let ctx = new_with_kv("actor-wake", "task-wake", Vec::new(), "local", kv.clone());
1215+
let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4);
1216+
let (_dispatch_tx, dispatch_rx) = mpsc::channel(4);
1217+
let (events_tx, events_rx) = mpsc::channel(4);
1218+
ctx.configure_lifecycle_events(Some(events_tx));
1219+
1220+
let factory = Arc::new(ActorFactory::new(
1221+
ActorConfig {
1222+
sleep_grace_period: Duration::from_millis(200),
1223+
sleep_grace_period_overridden: true,
1224+
..ActorConfig::default()
1225+
},
1226+
|start| {
1227+
Box::pin(async move {
1228+
let mut events = start.events;
1229+
while let Some(event) = events.recv().await {
1230+
match event {
1231+
ActorEvent::SerializeState { reply, .. } => {
1232+
reply.send(Ok(Vec::new()));
1233+
}
1234+
ActorEvent::RunGracefulCleanup { reply, .. } => {
1235+
reply.send(Ok(()));
1236+
}
1237+
_ => {}
1238+
}
1239+
}
1240+
Ok(())
1241+
})
1242+
},
1243+
));
1244+
1245+
let mut task = ActorTask::new(
1246+
"actor-wake".into(),
1247+
0,
1248+
lifecycle_rx,
1249+
dispatch_rx,
1250+
events_rx,
1251+
factory,
1252+
ctx.clone(),
1253+
None,
1254+
None,
1255+
);
1256+
let (start_tx, start_rx) = oneshot::channel();
1257+
task.handle_lifecycle(LifecycleCommand::Start { reply: start_tx })
1258+
.await;
1259+
start_rx
1260+
.await
1261+
.expect("start reply should send")
1262+
.expect("start should succeed");
1263+
1264+
ctx.sleep().expect("sleep request should succeed");
1265+
assert!(ctx.sleep_requested());
1266+
1267+
task.handle_stop(ShutdownKind::Sleep)
1268+
.await
1269+
.expect("sleep stop should succeed");
1270+
let (wake_tx, wake_rx) = oneshot::channel();
1271+
task.handle_lifecycle(LifecycleCommand::Start { reply: wake_tx })
1272+
.await;
1273+
wake_rx
1274+
.await
1275+
.expect("wake reply should send")
1276+
.expect("wake should succeed");
1277+
1278+
assert!(!ctx.sleep_requested());
1279+
}
1280+
12111281
#[tokio::test]
12121282
async fn sleep_shutdown_waits_for_on_state_change_before_final_save() {
12131283
let kv = new_in_memory();

0 commit comments

Comments
 (0)