Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion engine/sdks/typescript/envoy-client/src/config.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/sdks/typescript/envoy-client/src/handle.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 21 additions & 26 deletions engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rivetkit-typescript/packages/rivetkit/runtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ export class Runtime<A extends RegistryActors> {

startServerless(): void {
if (this.#startKind === "serverless") return;
invariant(!this.#startKind, "Runtime already started as runner");
invariant(!this.#startKind, "Runtime already started as serverful");
this.#startKind = "serverless";

this.#serverlessRouter = buildServerlessRouter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ export class EngineActorDriver implements ActorDriver {
#actorRouter: ActorRouter;
#sqlitePool: SqliteVfsPoolManager;

#envoyStarted: PromiseWithResolvers<undefined> = promiseWithResolvers(
#envoyStarted: PromiseWithResolvers<void> = promiseWithResolvers(
(reason) =>
logger().warn({
msg: "unhandled envoy started promise rejection",
reason,
}),
);
#envoyStopped: PromiseWithResolvers<utils.ShutdownReason> = promiseWithResolvers(
#envoyStopped: PromiseWithResolvers<void> = promiseWithResolvers(
(reason) =>
logger().warn({
msg: "unhandled envoy stopped promise rejection",
Expand Down Expand Up @@ -168,8 +168,8 @@ export class EngineActorDriver implements ActorDriver {
rivetkit: { version: VERSION },
},
prepopulateActorNames: buildActorNames(config),
onShutdown: (reason: utils.ShutdownReason) => {
this.#envoyStopped.resolve(reason);
onShutdown: () => {
this.#envoyStopped.resolve();
this.#isEnvoyStopped = true;
},
fetch: this.#envoyFetch.bind(this),
Expand All @@ -191,7 +191,7 @@ export class EngineActorDriver implements ActorDriver {
this.#envoy = envoy;

envoy.started().then(() => {
this.#envoyStarted.resolve(undefined);
this.#envoyStarted.resolve();
});

logger().debug({
Expand Down Expand Up @@ -514,18 +514,12 @@ export class EngineActorDriver implements ActorDriver {
// NOTE: onAbort does not work reliably
stream.onAbort(() => { });
c.req.raw.signal.addEventListener("abort", () => {
logger().debug("SSE aborted, shutting down runner");

// We cannot assume that the request will always be closed gracefully by Rivet. We always proceed with a graceful shutdown in case the request was terminated for any other reason.
//
// If we did not use a graceful shutdown, the runner would
this.shutdown(false);
logger().debug("SSE aborted");
});

await this.#envoyStarted.promise;

// Runner id should be set if the runner started
this.#envoy.startServerless(payload);
this.#envoy.startServerlessActor(payload);

// Send ping every second to keep the connection alive
while (true) {
Expand All @@ -548,12 +542,6 @@ export class EngineActorDriver implements ActorDriver {
await stream.writeSSE({ event: "ping", data: "" });
await stream.sleep(ENVOY_SSE_PING_INTERVAL);
}

// Wait for the runner to stop if the SSE stream aborted early for any reason
let reason = await this.#envoyStopped.promise;
if (reason === "serverless-early-exit") {
stream.writeSSE({ event: "stopping", data: "" });
}
});
}

Expand Down Expand Up @@ -783,7 +771,7 @@ export class EngineActorDriver implements ActorDriver {
});

try {
this.#envoy.stopActor(actorId, undefined);
this.#envoy.stopActor(actorId, undefined);
} catch (stopError) {
logger().debug({
msg: "failed to stop actor after start failure",
Expand Down
Loading