Skip to content

Commit 8747631

Browse files
committed
fix(envoy-client): fix handling actor event ack
1 parent 6728983 commit 8747631

File tree

11 files changed

+35
-1671
lines changed

11 files changed

+35
-1671
lines changed

engine/sdks/rust/envoy-client/src/commands.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@ pub async fn handle_commands(ctx: &mut EnvoyContext, commands: Vec<protocol::Com
3434
name: actor_name,
3535
event_history: Vec::new(),
3636
last_command_idx: checkpoint.index,
37+
received_stop: false,
3738
},
3839
);
3940
}
4041
protocol::Command::CommandStopActor(val) => {
4142
let entry = ctx.get_actor_entry_mut(&checkpoint.actor_id, checkpoint.generation);
4243

4344
if let Some(entry) = entry {
45+
entry.received_stop = true;
4446
entry.last_command_idx = checkpoint.index;
4547
let _ = entry.handle.send(crate::actor::ToActor::Stop {
4648
command_idx: checkpoint.index,

engine/sdks/rust/envoy-client/src/envoy.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub struct ActorEntry {
4141
pub name: String,
4242
pub event_history: Vec<protocol::EventWrapper>,
4343
pub last_command_idx: i64,
44+
pub received_stop: bool,
4445
}
4546

4647
pub enum ToEnvoyMessage {

engine/sdks/rust/envoy-client/src/events.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,18 @@ pub async fn handle_send_events(ctx: &mut EnvoyContext, events: Vec<protocol::Ev
1010
ctx.get_actor_entry_mut(&event.checkpoint.actor_id, event.checkpoint.generation);
1111
if let Some(entry) = entry {
1212
entry.event_history.push(event.clone());
13+
14+
if let protocol::Event::EventActorStateUpdate(ref state_update) = event.inner {
15+
if matches!(
16+
state_update.state,
17+
protocol::ActorState::ActorStateStopped(_)
18+
) {
19+
// If the actor is being stopped by rivet, we don't need the entry anymore
20+
if entry.received_stop {
21+
ctx.actors.remove(&event.checkpoint.actor_id);
22+
}
23+
}
24+
}
1325
}
1426
}
1527

@@ -19,30 +31,17 @@ pub async fn handle_send_events(ctx: &mut EnvoyContext, events: Vec<protocol::Ev
1931

2032
pub fn handle_ack_events(ctx: &mut EnvoyContext, ack: protocol::ToEnvoyAckEvents) {
2133
for checkpoint in &ack.last_event_checkpoints {
22-
let actor_entry = ctx.actors.get_mut(&checkpoint.actor_id);
23-
if let Some(actor_entry) = actor_entry {
24-
let gen_entry = actor_entry.get_mut(&checkpoint.generation);
25-
let remove = if let Some(gen_entry) = gen_entry {
26-
gen_entry
27-
.event_history
28-
.retain(|event| event.checkpoint.index > checkpoint.index);
29-
30-
gen_entry.event_history.is_empty() && gen_entry.handle.is_closed()
31-
} else {
32-
false
33-
};
34-
35-
// Clean up fully acked stopped actors
36-
if remove {
37-
actor_entry.remove(&checkpoint.generation);
38-
if actor_entry.is_empty() {
39-
ctx.actors.remove(&checkpoint.actor_id);
40-
}
41-
}
34+
let entry = ctx.get_actor_entry_mut(&checkpoint.actor_id, checkpoint.generation);
35+
if let Some(entry) = entry {
36+
entry
37+
.event_history
38+
.retain(|event| event.checkpoint.index > checkpoint.index);
4239
}
4340
}
4441
}
4542

43+
// TODO: If the envoy disconnects, actor stops, then envoy reconnects, we will send the stop event but there
44+
// is no mechanism to remove the actor entry afterwards. We only remove the actor entry if rivet stops the actor.
4645
pub async fn resend_unacknowledged_events(ctx: &EnvoyContext) {
4746
let mut events: Vec<protocol::EventWrapper> = Vec::new();
4847

rivetkit-typescript/packages/rivetkit-native/wrapper.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,18 @@
88

99
const native = require("./index");
1010

11+
// CloseEvent was added to Node.js in v22. Polyfill for older versions.
12+
if (typeof CloseEvent === "undefined") {
13+
global.CloseEvent = class CloseEvent extends Event {
14+
constructor(type, init = {}) {
15+
super(type);
16+
this.code = init.code ?? 0;
17+
this.reason = init.reason ?? "";
18+
this.wasClean = init.wasClean ?? false;
19+
}
20+
};
21+
}
22+
1123
// Re-export protocol for consumers that need protocol types at runtime
1224
let _protocol;
1325
try {

rivetkit-typescript/packages/rivetkit/src/serverless/router.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ export function buildServerlessRouter(config: RegistryConfig) {
7575
};
7676
const runnerConfig: RegistryConfig = {
7777
...sharedConfig,
78-
token,
78+
token: config.token ?? token,
7979
};
8080
const clientConfig: RegistryConfig = {
8181
...sharedConfig,

rivetkit-typescript/packages/sqlite-native/Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,3 @@ tokio = { version = "1", features = ["rt"] }
1414
tracing = "0.1"
1515
async-trait = "0.1"
1616
getrandom = "0.2"
17-
18-
[profile.release]
19-
lto = true

scripts/tests/actor_spam.ts

Lines changed: 0 additions & 148 deletions
This file was deleted.

scripts/tests/load-test/.gitignore

Lines changed: 0 additions & 6 deletions
This file was deleted.

0 commit comments

Comments
 (0)