From 5fdf3add90d743ce39b011c38746de7dbefde116 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Mon, 6 Apr 2026 12:17:03 -0700 Subject: [PATCH] fix(envoy): use global instance, add signal handlers --- .../typescript/envoy-client/src/config.ts | 7 ++- .../typescript/envoy-client/src/handle.ts | 2 +- .../envoy-client/src/tasks/envoy/events.ts | 9 +--- .../envoy-client/src/tasks/envoy/index.ts | 47 +++++++++---------- .../packages/rivetkit/runtime/index.ts | 2 +- .../src/drivers/engine/actor-driver.ts | 28 ++++------- 6 files changed, 38 insertions(+), 57 deletions(-) diff --git a/engine/sdks/typescript/envoy-client/src/config.ts b/engine/sdks/typescript/envoy-client/src/config.ts index 37720ed78f..bb656c3f06 100644 --- a/engine/sdks/typescript/envoy-client/src/config.ts +++ b/engine/sdks/typescript/envoy-client/src/config.ts @@ -12,6 +12,11 @@ export interface EnvoyConfig { poolName: string; prepopulateActorNames: Record }>; metadata?: Record; + /** + * When startEnvoy is called, create a new envoy every time instead of using a single global envoy + * instance for the entire runtime. + */ + notGlobal?: boolean; /** * Debug option to inject artificial latency (in ms) into WebSocket @@ -161,5 +166,5 @@ export interface EnvoyConfig { generation: number, reason: protocol.StopActorReason, ) => Promise; - onShutdown: (reason: ShutdownReason) => void; + onShutdown: () => void; } diff --git a/engine/sdks/typescript/envoy-client/src/handle.ts b/engine/sdks/typescript/envoy-client/src/handle.ts index 8a0e783c2f..3dc703a9fa 100644 --- a/engine/sdks/typescript/envoy-client/src/handle.ts +++ b/engine/sdks/typescript/envoy-client/src/handle.ts @@ -96,5 +96,5 @@ export interface EnvoyHandle { clientMessageIndex: number, ): void; - startServerless(payload: ArrayBuffer): void; + startServerlessActor(payload: ArrayBuffer): void; } diff --git a/engine/sdks/typescript/envoy-client/src/tasks/envoy/events.ts b/engine/sdks/typescript/envoy-client/src/tasks/envoy/events.ts index 22ee15783b..17247dbdf1 100644 --- a/engine/sdks/typescript/envoy-client/src/tasks/envoy/events.ts +++ b/engine/sdks/typescript/envoy-client/src/tasks/envoy/events.ts @@ -6,9 +6,7 @@ import { wsSend } from "../connection.js"; export function handleSendEvents( ctx: EnvoyContext, events: protocol.EventWrapper[], -): boolean { - let stop = false; - +) { // Record in history per actor for (const event of events) { const entry = getActorEntry( @@ -24,9 +22,6 @@ export function handleSendEvents( if (event.inner.tag === "EventActorStateUpdate") { if (event.inner.val.state.tag === "ActorStateStopped") { entry.handle.close(); - - // Serverless envoys only handle one actor which means if it stops, the envoy should stop too - if (ctx.serverless) stop = true; } } } @@ -37,8 +32,6 @@ export function handleSendEvents( tag: "ToRivetEvents", val: events, }); - - return stop; } export function handleAckEvents( diff --git a/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts b/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts index 9daefd8137..efd0a19087 100644 --- a/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts +++ b/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts @@ -32,9 +32,10 @@ import { sleep, spawn, watch, WatchReceiver, WatchSender } from "antiox"; import { BufferMap, EnvoyShutdownError } from "@/utils.js"; import { stringifyToEnvoy } from "@/stringify.js"; +let GLOBAL_ENVOY: EnvoyHandle | undefined = undefined; + export interface EnvoyContext { shared: SharedContext; - serverless: boolean; shuttingDown: boolean; actors: Map>; kvRequests: Map; @@ -95,6 +96,8 @@ export async function startEnvoy(config: EnvoyConfig): Promise { // Must manually wait for envoy to start. export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { + if (!config.notGlobal && GLOBAL_ENVOY) return GLOBAL_ENVOY; + const [envoyTx, envoyRx] = unboundedChannel(); const [startTx, startRx] = watch(void 0); const actors: Map> = new Map(); @@ -107,11 +110,10 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { handle: null as any, }; - const connHandle = startConnection(shared); + startConnection(shared); const ctx: EnvoyContext = { shared, - serverless: false, shuttingDown: false, actors, kvRequests: new Map(), @@ -124,6 +126,16 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { const handle = createHandle(ctx, startRx); shared.handle = handle; + GLOBAL_ENVOY = handle; + + // Register signal handlers + const onSignal = () => { + log(ctx.shared)?.info({ msg: "received stop signal, starting envoy shutdown" }); + handle.shutdown(false); + }; + process.once("SIGINT", onSignal); + process.once("SIGTERM", onSignal); + log(ctx.shared)?.info({ msg: "starting envoy" }); spawn(async () => { @@ -136,7 +148,6 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { }, KV_CLEANUP_INTERVAL_MS); let lostTimeout: NodeJS.Timeout | undefined = undefined; - let serverlessShutdown = false; for await (const msg of envoyRx) { if (msg.type === "conn-message") { @@ -145,15 +156,7 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { handleConnClose(ctx, lostTimeout); if (msg.evict) break; } else if (msg.type === "send-events") { - const stop = handleSendEvents(ctx, msg.events); - - if (stop) { - serverlessShutdown = true; - log(ctx.shared)?.info({ - msg: "serverless actor stopped, stopping envoy" - }); - break; - } + handleSendEvents(ctx, msg.events); } else if (msg.type === "kv-request") { handleKvRequest(ctx, msg); } else if (msg.type === "buffer-tunnel-msg") { @@ -188,14 +191,9 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { msg: "envoy stopped", }); - ctx.shared.config.onShutdown(serverlessShutdown ? "serverless-early-exit" : "normal"); + ctx.shared.config.onShutdown(); }); - // Queue start actor - if (shared.config.serverlessStartPayload) { - handle.startServerless(shared.config.serverlessStartPayload); - } - return handle; } @@ -576,10 +574,7 @@ function createHandle( sendHibernatableWebSocketMessageAck(ctx, gatewayId, requestId, clientMessageIndex); }, - startServerless(payload: ArrayBuffer) { - if (ctx.serverless) throw new Error("Already started serverless actor"); - ctx.serverless = true; - + startServerlessActor(payload: ArrayBuffer) { let version = new DataView(payload).getUint16(0, true); if (version != protocol.VERSION) @@ -588,9 +583,9 @@ function createHandle( // Skip first 2 bytes (version) const message = protocol.decodeToEnvoy(new Uint8Array(payload, 2)); - if (message.tag !== "ToEnvoyCommands") throw new Error("invalid serverless body"); - if (message.val.length !== 1) throw new Error("invalid serverless body"); - if (message.val[0].inner.tag !== "CommandStartActor") throw new Error("invalid serverless body"); + if (message.tag !== "ToEnvoyCommands") throw new Error("invalid serverless payload"); + if (message.val.length !== 1) throw new Error("invalid serverless payload"); + if (message.val[0].inner.tag !== "CommandStartActor") throw new Error("invalid serverless payload"); // Wait for envoy to start before adding message startedPromise.then(() => { diff --git a/rivetkit-typescript/packages/rivetkit/runtime/index.ts b/rivetkit-typescript/packages/rivetkit/runtime/index.ts index b0f43dabb9..51971cdb7f 100644 --- a/rivetkit-typescript/packages/rivetkit/runtime/index.ts +++ b/rivetkit-typescript/packages/rivetkit/runtime/index.ts @@ -244,7 +244,7 @@ export class Runtime { startServerless(): void { if (this.#startKind === "serverless") return; - invariant(!this.#startKind, "Runtime already started as runner"); + invariant(!this.#startKind, "Runtime already started as serverful"); this.#startKind = "serverless"; this.#serverlessRouter = buildServerlessRouter( diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index ba999184b5..fffadc7e69 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -90,14 +90,14 @@ export class EngineActorDriver implements ActorDriver { #actorRouter: ActorRouter; #sqlitePool: SqliteVfsPoolManager; - #envoyStarted: PromiseWithResolvers = promiseWithResolvers( + #envoyStarted: PromiseWithResolvers = promiseWithResolvers( (reason) => logger().warn({ msg: "unhandled envoy started promise rejection", reason, }), ); - #envoyStopped: PromiseWithResolvers = promiseWithResolvers( + #envoyStopped: PromiseWithResolvers = promiseWithResolvers( (reason) => logger().warn({ msg: "unhandled envoy stopped promise rejection", @@ -168,8 +168,8 @@ export class EngineActorDriver implements ActorDriver { rivetkit: { version: VERSION }, }, prepopulateActorNames: buildActorNames(config), - onShutdown: (reason: utils.ShutdownReason) => { - this.#envoyStopped.resolve(reason); + onShutdown: () => { + this.#envoyStopped.resolve(); this.#isEnvoyStopped = true; }, fetch: this.#envoyFetch.bind(this), @@ -191,7 +191,7 @@ export class EngineActorDriver implements ActorDriver { this.#envoy = envoy; envoy.started().then(() => { - this.#envoyStarted.resolve(undefined); + this.#envoyStarted.resolve(); }); logger().debug({ @@ -514,18 +514,12 @@ export class EngineActorDriver implements ActorDriver { // NOTE: onAbort does not work reliably stream.onAbort(() => { }); c.req.raw.signal.addEventListener("abort", () => { - logger().debug("SSE aborted, shutting down runner"); - - // We cannot assume that the request will always be closed gracefully by Rivet. We always proceed with a graceful shutdown in case the request was terminated for any other reason. - // - // If we did not use a graceful shutdown, the runner would - this.shutdown(false); + logger().debug("SSE aborted"); }); await this.#envoyStarted.promise; - // Runner id should be set if the runner started - this.#envoy.startServerless(payload); + this.#envoy.startServerlessActor(payload); // Send ping every second to keep the connection alive while (true) { @@ -548,12 +542,6 @@ export class EngineActorDriver implements ActorDriver { await stream.writeSSE({ event: "ping", data: "" }); await stream.sleep(ENVOY_SSE_PING_INTERVAL); } - - // Wait for the runner to stop if the SSE stream aborted early for any reason - let reason = await this.#envoyStopped.promise; - if (reason === "serverless-early-exit") { - stream.writeSSE({ event: "stopping", data: "" }); - } }); } @@ -783,7 +771,7 @@ export class EngineActorDriver implements ActorDriver { }); try { - this.#envoy.stopActor(actorId, undefined); + this.#envoy.stopActor(actorId, undefined); } catch (stopError) { logger().debug({ msg: "failed to stop actor after start failure",