Skip to content

Commit 0cf23ef

Browse files
committed
fix(pegboard): restore hibernating request ids
1 parent ecf7dd7 commit 0cf23ef

4 files changed

Lines changed: 89 additions & 3 deletions

File tree

engine/packages/pegboard/src/workflows/actor2/runtime.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,17 @@ pub async fn send_outbound(ctx: &ActivityCtx, input: &SendOutboundInput) -> Resu
358358
.await?;
359359
}
360360
Allocation::Serverful { envoy_key } => {
361+
let hibernating_requests = ctx
362+
.op(crate::ops::actor::hibernating_request::list::Input {
363+
actor_id: state.actor_id,
364+
})
365+
.await?
366+
.into_iter()
367+
.map(|request| protocol::HibernatingRequest {
368+
gateway_id: request.gateway_id,
369+
request_id: request.request_id,
370+
})
371+
.collect();
361372
let command = protocol::Command::CommandStartActor(protocol::CommandStartActor {
362373
config: protocol::ActorConfig {
363374
name: state.name.clone(),
@@ -368,9 +379,7 @@ pub async fn send_outbound(ctx: &ActivityCtx, input: &SendOutboundInput) -> Resu
368379
.as_ref()
369380
.and_then(|x| BASE64_STANDARD.decode(x).ok()),
370381
},
371-
// Empty because request ids are ephemeral. This is intercepted by guard and
372-
// populated before it reaches the runner
373-
hibernating_requests: Vec::new(),
382+
hibernating_requests,
374383
preloaded_kv: None,
375384
});
376385

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,6 +1314,10 @@ impl ActorContext {
13141314
self.0.sleep_requested.load(Ordering::SeqCst)
13151315
}
13161316

1317+
pub(crate) fn clear_sleep_requested(&self) {
1318+
self.0.sleep_requested.store(false, Ordering::SeqCst);
1319+
}
1320+
13171321
fn keep_awake_guard(&self) -> KeepAwakeGuard {
13181322
let region = self
13191323
.keep_awake_region()

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2141,6 +2141,9 @@ impl ActorTask {
21412141
"actor lifecycle transition"
21422142
);
21432143
self.lifecycle = lifecycle;
2144+
if matches!(lifecycle, LifecycleState::Started) {
2145+
self.ctx.clear_sleep_requested();
2146+
}
21442147
self.ctx
21452148
.set_started(matches!(lifecycle, LifecycleState::Started));
21462149
}

rivetkit-rust/packages/rivetkit-core/tests/task.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1207,6 +1207,76 @@ mod moved_tests {
12071207
assert_eq!(remaining_conns[0].id(), hibernating_conn.id());
12081208
}
12091209

1210+
#[tokio::test]
1211+
async fn wake_start_clears_previous_sleep_request() {
1212+
let kv = new_in_memory();
1213+
let ctx = new_with_kv("actor-wake", "task-wake", Vec::new(), "local", kv.clone());
1214+
let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4);
1215+
let (_dispatch_tx, dispatch_rx) = mpsc::channel(4);
1216+
let (events_tx, events_rx) = mpsc::channel(4);
1217+
ctx.configure_lifecycle_events(Some(events_tx));
1218+
1219+
let factory = Arc::new(ActorFactory::new(
1220+
ActorConfig {
1221+
sleep_grace_period: Duration::from_millis(200),
1222+
sleep_grace_period_overridden: true,
1223+
..ActorConfig::default()
1224+
},
1225+
|start| {
1226+
Box::pin(async move {
1227+
let mut events = start.events;
1228+
while let Some(event) = events.recv().await {
1229+
match event {
1230+
ActorEvent::SerializeState { reply, .. } => {
1231+
reply.send(Ok(Vec::new()));
1232+
}
1233+
ActorEvent::RunGracefulCleanup { reply, .. } => {
1234+
reply.send(Ok(()));
1235+
}
1236+
_ => {}
1237+
}
1238+
}
1239+
Ok(())
1240+
})
1241+
},
1242+
));
1243+
1244+
let mut task = ActorTask::new(
1245+
"actor-wake".into(),
1246+
0,
1247+
lifecycle_rx,
1248+
dispatch_rx,
1249+
events_rx,
1250+
factory,
1251+
ctx.clone(),
1252+
None,
1253+
None,
1254+
);
1255+
let (start_tx, start_rx) = oneshot::channel();
1256+
task.handle_lifecycle(LifecycleCommand::Start { reply: start_tx })
1257+
.await;
1258+
start_rx
1259+
.await
1260+
.expect("start reply should send")
1261+
.expect("start should succeed");
1262+
1263+
ctx.sleep().expect("sleep request should succeed");
1264+
assert!(ctx.sleep_requested());
1265+
1266+
task.handle_stop(ShutdownKind::Sleep)
1267+
.await
1268+
.expect("sleep stop should succeed");
1269+
let (wake_tx, wake_rx) = oneshot::channel();
1270+
task.handle_lifecycle(LifecycleCommand::Start { reply: wake_tx })
1271+
.await;
1272+
wake_rx
1273+
.await
1274+
.expect("wake reply should send")
1275+
.expect("wake should succeed");
1276+
1277+
assert!(!ctx.sleep_requested());
1278+
}
1279+
12101280
#[tokio::test]
12111281
async fn sleep_shutdown_waits_for_on_state_change_before_final_save() {
12121282
let kv = new_in_memory();

0 commit comments

Comments
 (0)