Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions engine/sdks/rust/envoy-client/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ pub async fn handle_commands(ctx: &mut EnvoyContext, commands: Vec<protocol::Com
name: actor_name,
event_history: Vec::new(),
last_command_idx: checkpoint.index,
received_stop: false,
},
);
}
protocol::Command::CommandStopActor(val) => {
let entry = ctx.get_actor_entry_mut(&checkpoint.actor_id, checkpoint.generation);

if let Some(entry) = entry {
entry.received_stop = true;
entry.last_command_idx = checkpoint.index;
let _ = entry.handle.send(crate::actor::ToActor::Stop {
command_idx: checkpoint.index,
Expand Down
1 change: 1 addition & 0 deletions engine/sdks/rust/envoy-client/src/envoy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct ActorEntry {
pub name: String,
pub event_history: Vec<protocol::EventWrapper>,
pub last_command_idx: i64,
pub received_stop: bool,
}

pub enum ToEnvoyMessage {
Expand Down
39 changes: 19 additions & 20 deletions engine/sdks/rust/envoy-client/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ pub async fn handle_send_events(ctx: &mut EnvoyContext, events: Vec<protocol::Ev
ctx.get_actor_entry_mut(&event.checkpoint.actor_id, event.checkpoint.generation);
if let Some(entry) = entry {
entry.event_history.push(event.clone());

if let protocol::Event::EventActorStateUpdate(ref state_update) = event.inner {
if matches!(
state_update.state,
protocol::ActorState::ActorStateStopped(_)
) {
// If the actor is being stopped by rivet, we don't need the entry anymore
if entry.received_stop {
ctx.actors.remove(&event.checkpoint.actor_id);
}
}
}
}
}

Expand All @@ -19,30 +31,17 @@ pub async fn handle_send_events(ctx: &mut EnvoyContext, events: Vec<protocol::Ev

pub fn handle_ack_events(ctx: &mut EnvoyContext, ack: protocol::ToEnvoyAckEvents) {
for checkpoint in &ack.last_event_checkpoints {
let actor_entry = ctx.actors.get_mut(&checkpoint.actor_id);
if let Some(actor_entry) = actor_entry {
let gen_entry = actor_entry.get_mut(&checkpoint.generation);
let remove = if let Some(gen_entry) = gen_entry {
gen_entry
.event_history
.retain(|event| event.checkpoint.index > checkpoint.index);

gen_entry.event_history.is_empty() && gen_entry.handle.is_closed()
} else {
false
};

// Clean up fully acked stopped actors
if remove {
actor_entry.remove(&checkpoint.generation);
if actor_entry.is_empty() {
ctx.actors.remove(&checkpoint.actor_id);
}
}
let entry = ctx.get_actor_entry_mut(&checkpoint.actor_id, checkpoint.generation);
if let Some(entry) = entry {
entry
.event_history
.retain(|event| event.checkpoint.index > checkpoint.index);
}
}
}

// TODO: If the envoy disconnects, actor stops, then envoy reconnects, we will send the stop event but there
// is no mechanism to remove the actor entry afterwards. We only remove the actor entry if rivet stops the actor.
pub async fn resend_unacknowledged_events(ctx: &EnvoyContext) {
let mut events: Vec<protocol::EventWrapper> = Vec::new();

Expand Down
12 changes: 12 additions & 0 deletions rivetkit-typescript/packages/rivetkit-native/wrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@

const native = require("./index");

// CloseEvent was added to Node.js in v22. Polyfill for older versions.
if (typeof CloseEvent === "undefined") {
global.CloseEvent = class CloseEvent extends Event {
constructor(type, init = {}) {
super(type);
this.code = init.code ?? 0;
this.reason = init.reason ?? "";
this.wasClean = init.wasClean ?? false;
}
};
}

// Re-export protocol for consumers that need protocol types at runtime
let _protocol;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export function buildServerlessRouter(config: RegistryConfig) {
};
const runnerConfig: RegistryConfig = {
...sharedConfig,
token,
token: config.token ?? token,
};
const clientConfig: RegistryConfig = {
...sharedConfig,
Expand Down
3 changes: 0 additions & 3 deletions rivetkit-typescript/packages/sqlite-native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,3 @@ tokio = { version = "1", features = ["rt"] }
tracing = "0.1"
async-trait = "0.1"
getrandom = "0.2"

[profile.release]
lto = true
148 changes: 0 additions & 148 deletions scripts/tests/actor_spam.ts

This file was deleted.

6 changes: 0 additions & 6 deletions scripts/tests/load-test/.gitignore

This file was deleted.

Loading
Loading