Skip to content

Commit 6512e52

Browse files
committed
fix(rivetkit-core): keep sleeping actors reachable during grace
1 parent e37a5c2 commit 6512e52

3 files changed

Lines changed: 97 additions & 76 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2159,11 +2159,14 @@ impl ActorTask {
21592159
// only meant to stop the previous generation.
21602160
self.ctx.reset_abort_signal_for_start();
21612161
self.ctx.clear_sleep_requested();
2162+
}
2163+
self.ctx
2164+
.set_started(matches!(
2165+
lifecycle,
2166+
LifecycleState::Started | LifecycleState::SleepGrace
2167+
));
21622168
}
2163-
self.ctx
2164-
.set_started(matches!(lifecycle, LifecycleState::Started));
21652169
}
2166-
}
21672170

21682171
fn shutdown_reason_label(reason: ShutdownKind) -> &'static str {
21692172
match reason {

rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

Lines changed: 87 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -103,20 +103,23 @@ type ActiveActorInstance = Arc<ActorTaskHandle>;
103103

104104
enum ActorInstanceState {
105105
Active(ActiveActorInstance),
106-
Stopping(ActiveActorInstance),
106+
Stopping {
107+
instance: ActiveActorInstance,
108+
reason: ShutdownKind,
109+
},
107110
}
108111

109112
impl ActorInstanceState {
110113
fn instance(&self) -> ActiveActorInstance {
111114
match self {
112-
Self::Active(instance) | Self::Stopping(instance) => instance.clone(),
115+
Self::Active(instance) | Self::Stopping { instance, .. } => instance.clone(),
113116
}
114117
}
115118

116119
fn active_instance(&self) -> Option<ActiveActorInstance> {
117120
match self {
118121
Self::Active(instance) => Some(instance.clone()),
119-
Self::Stopping(_) => None,
122+
Self::Stopping { .. } => None,
120123
}
121124
}
122125
}
@@ -675,19 +678,20 @@ impl RegistryDispatcher {
675678
.remove_async(&request.actor_id.clone())
676679
.await
677680
.map(|(_, pending_stop)| pending_stop);
678-
if let Some(pending_stop) = pending_stop {
679-
let actor_id = request.actor_id.clone();
680-
if matches!(
681-
map_envoy_stop_reason(&pending_stop.reason),
682-
ShutdownKind::Destroy
683-
) {
684-
instance.ctx.mark_destroy_requested();
685-
}
686-
self.set_actor_instance_state(
687-
actor_id.clone(),
688-
ActorInstanceState::Stopping(instance.clone()),
689-
)
690-
.await;
681+
if let Some(pending_stop) = pending_stop {
682+
let actor_id = request.actor_id.clone();
683+
let stop_reason = map_envoy_stop_reason(&pending_stop.reason);
684+
if matches!(stop_reason, ShutdownKind::Destroy) {
685+
instance.ctx.mark_destroy_requested();
686+
}
687+
self.set_actor_instance_state(
688+
actor_id.clone(),
689+
ActorInstanceState::Stopping {
690+
instance: instance.clone(),
691+
reason: stop_reason,
692+
},
693+
)
694+
.await;
691695
let _ = self
692696
.starting_instances
693697
.remove_async(&request.actor_id.clone())
@@ -753,15 +757,22 @@ impl RegistryDispatcher {
753757
}
754758
}
755759

756-
async fn transition_actor_to_stopping(&self, actor_id: &str) -> Option<ActiveActorInstance> {
757-
match self.actor_instances.entry_async(actor_id.to_owned()).await {
758-
SccEntry::Occupied(mut entry) => {
759-
let instance = entry.get().instance();
760-
if matches!(entry.get(), ActorInstanceState::Active(_)) {
761-
entry.insert(ActorInstanceState::Stopping(instance.clone()));
762-
} else {
763-
instance
764-
.ctx
760+
async fn transition_actor_to_stopping(
761+
&self,
762+
actor_id: &str,
763+
reason: ShutdownKind,
764+
) -> Option<ActiveActorInstance> {
765+
match self.actor_instances.entry_async(actor_id.to_owned()).await {
766+
SccEntry::Occupied(mut entry) => {
767+
let instance = entry.get().instance();
768+
if matches!(entry.get(), ActorInstanceState::Active(_)) {
769+
entry.insert(ActorInstanceState::Stopping {
770+
instance: instance.clone(),
771+
reason,
772+
});
773+
} else {
774+
instance
775+
.ctx
765776
.warn_work_sent_to_stopping_instance("stop_actor");
766777
}
767778
Some(instance)
@@ -776,10 +787,12 @@ impl RegistryDispatcher {
776787
async fn remove_stopping_actor_instance(&self, actor_id: &str, expected: &ActiveActorInstance) {
777788
match self.actor_instances.entry_async(actor_id.to_owned()).await {
778789
SccEntry::Occupied(entry) => {
779-
let should_remove = match entry.get() {
780-
ActorInstanceState::Stopping(instance) => Arc::ptr_eq(instance, expected),
781-
ActorInstanceState::Active(_) => false,
782-
};
790+
let should_remove = match entry.get() {
791+
ActorInstanceState::Stopping { instance, .. } => {
792+
Arc::ptr_eq(instance, expected)
793+
}
794+
ActorInstanceState::Active(_) => false,
795+
};
783796
if should_remove {
784797
let _ = entry.remove_entry();
785798
}
@@ -793,44 +806,49 @@ impl RegistryDispatcher {
793806
async fn active_actor(&self, actor_id: &str) -> Result<Arc<ActorTaskHandle>> {
794807
if let Some(instance) = self.actor_instances.get_async(&actor_id.to_owned()).await {
795808
match instance.get() {
796-
ActorInstanceState::Active(instance) => {
797-
let instance = instance.clone();
798-
if instance.ctx.started() {
799-
if instance.ctx.destroy_requested() || instance.ctx.sleep_requested() {
800-
instance
801-
.ctx
802-
.warn_work_sent_to_stopping_instance("active_actor");
803-
return Err(if instance.ctx.destroy_requested() {
804-
ActorLifecycleError::Destroying.build()
805-
} else {
806-
ActorLifecycleError::Stopping.build()
807-
});
809+
ActorInstanceState::Active(instance) => {
810+
let instance = instance.clone();
811+
if instance.ctx.started() {
812+
if instance.ctx.destroy_requested() {
813+
instance
814+
.ctx
815+
.warn_work_sent_to_stopping_instance("active_actor");
816+
return Err(ActorLifecycleError::Destroying.build());
817+
}
818+
return Ok(instance);
808819
}
809-
return Ok(instance);
810-
}
811820

812821
instance
813822
.ctx
814823
.warn_work_sent_to_stopping_instance("active_actor");
815-
return Err(if instance.ctx.destroy_requested() {
816-
ActorLifecycleError::Destroying.build()
817-
} else {
818-
ActorLifecycleError::Starting.build()
819-
});
820-
}
821-
ActorInstanceState::Stopping(instance) => {
822-
let instance = instance.clone();
823-
instance
824-
.ctx
825-
.warn_work_sent_to_stopping_instance("active_actor");
826-
return Err(if instance.ctx.destroy_requested() {
827-
ActorLifecycleError::Destroying.build()
828-
} else {
829-
ActorLifecycleError::Stopping.build()
830-
});
824+
return Err(if instance.ctx.destroy_requested() {
825+
ActorLifecycleError::Destroying.build()
826+
} else if instance.ctx.sleep_requested() {
827+
ActorLifecycleError::Stopping.build()
828+
} else {
829+
ActorLifecycleError::Starting.build()
830+
});
831+
}
832+
ActorInstanceState::Stopping { instance, reason } => {
833+
let instance = instance.clone();
834+
match reason {
835+
ShutdownKind::Sleep if instance.ctx.started() => return Ok(instance),
836+
ShutdownKind::Sleep => {
837+
instance
838+
.ctx
839+
.warn_work_sent_to_stopping_instance("active_actor");
840+
return Err(ActorLifecycleError::Stopping.build());
841+
}
842+
ShutdownKind::Destroy => {
843+
instance
844+
.ctx
845+
.warn_work_sent_to_stopping_instance("active_actor");
846+
return Err(ActorLifecycleError::Destroying.build());
847+
}
848+
}
849+
}
831850
}
832851
}
833-
}
834852

835853
tracing::warn!(actor_id, "actor instance not found");
836854
Err(ActorRuntime::NotFound {
@@ -865,9 +883,13 @@ impl RegistryDispatcher {
865883
return Ok(());
866884
}
867885

868-
let instance = match self.transition_actor_to_stopping(actor_id).await {
869-
Some(instance) => instance,
870-
None => {
886+
let task_stop_reason = map_envoy_stop_reason(&reason);
887+
let instance = match self
888+
.transition_actor_to_stopping(actor_id, task_stop_reason)
889+
.await
890+
{
891+
Some(instance) => instance,
892+
None => {
871893
let _ = self
872894
.pending_stops
873895
.insert_async(
@@ -881,8 +903,8 @@ impl RegistryDispatcher {
881903
return Ok(());
882904
}
883905
};
884-
let result = self
885-
.shutdown_started_instance(actor_id, instance.clone(), reason, stop_handle)
906+
let result = self
907+
.shutdown_started_instance(actor_id, instance.clone(), reason, stop_handle)
886908
.await;
887909
self.remove_stopping_actor_instance(actor_id, &instance)
888910
.await;

rivetkit-typescript/packages/rivetkit/tests/driver/actor-sleep-db.test.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -428,8 +428,7 @@ describeDriverMatrix("Actor Sleep Db", (driverTestConfig) => {
428428
expect(events).toContain("sleep-end");
429429
});
430430

431-
// TODO(#4705): Root-cause handle action dispatch ordering during sleep shutdown and re-enable this coverage.
432-
test.skip("action via handle during sleep shutdown is not queued", async (c) => {
431+
test("action via handle during sleep shutdown is not queued", async (c) => {
433432
const { client } = await setupDriverTest(c, driverTestConfig);
434433

435434
const handle = client.sleepWithDbAction.getOrCreate([
@@ -647,8 +646,7 @@ describeDriverMatrix("Actor Sleep Db", (driverTestConfig) => {
647646
);
648647
});
649648

650-
// TODO(#4705): Root-cause connection action dispatch ordering during sleep shutdown and re-enable this coverage.
651-
test.skip("action via WebSocket connection during sleep shutdown is not queued", async (c) => {
649+
test("action via WebSocket connection during sleep shutdown is not queued", async (c) => {
652650
const { client } = await setupDriverTest(c, driverTestConfig);
653651

654652
const handle = client.sleepWithDbAction.getOrCreate([
@@ -698,8 +696,7 @@ describeDriverMatrix("Actor Sleep Db", (driverTestConfig) => {
698696
expect(events).not.toContain("ws-during-sleep");
699697
}
700698
});
701-
// TODO(#4705): Root-cause new connection behavior during sleep shutdown and re-enable this coverage.
702-
test.skip("new connections rejected during sleep shutdown", async (c) => {
699+
test("new connections rejected during sleep shutdown", async (c) => {
703700
const { client } = await setupDriverTest(c, driverTestConfig);
704701

705702
// The sleepWithDbAction actor has a 500ms delay in
@@ -744,8 +741,7 @@ describeDriverMatrix("Actor Sleep Db", (driverTestConfig) => {
744741
await secondConn.dispose();
745742
});
746743

747-
// TODO(#4705): Root-cause raw WebSocket admission during sleep shutdown and re-enable this coverage.
748-
test.skip("new raw WebSocket during sleep shutdown is rejected or queued", async (c) => {
744+
test("new raw WebSocket during sleep shutdown is rejected or queued", async (c) => {
749745
const { client } = await setupDriverTest(c, driverTestConfig);
750746

751747
// The sleepWithRawWs actor has a 500ms delay in onSleep.

0 commit comments

Comments
 (0)