Skip to content

Commit 6c9af76

Browse files
committed
fix(envoy): use global instance, add signal handlers
1 parent de70131 commit 6c9af76

File tree

6 files changed

+38
-57
lines changed

6 files changed

+38
-57
lines changed

engine/sdks/typescript/envoy-client/src/config.ts

Lines changed: 6 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/sdks/typescript/envoy-client/src/handle.ts

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/sdks/typescript/envoy-client/src/tasks/envoy/events.ts

Lines changed: 1 addition & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts

Lines changed: 21 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rivetkit-typescript/packages/rivetkit/runtime/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ export class Runtime<A extends RegistryActors> {
244244

245245
startServerless(): void {
246246
if (this.#startKind === "serverless") return;
247-
invariant(!this.#startKind, "Runtime already started as runner");
247+
invariant(!this.#startKind, "Runtime already started as serverful");
248248
this.#startKind = "serverless";
249249

250250
this.#serverlessRouter = buildServerlessRouter(

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,14 @@ export class EngineActorDriver implements ActorDriver {
9090
#actorRouter: ActorRouter;
9191
#sqlitePool: SqliteVfsPoolManager;
9292

93-
#envoyStarted: PromiseWithResolvers<undefined> = promiseWithResolvers(
93+
#envoyStarted: PromiseWithResolvers<void> = promiseWithResolvers(
9494
(reason) =>
9595
logger().warn({
9696
msg: "unhandled envoy started promise rejection",
9797
reason,
9898
}),
9999
);
100-
#envoyStopped: PromiseWithResolvers<utils.ShutdownReason> = promiseWithResolvers(
100+
#envoyStopped: PromiseWithResolvers<void> = promiseWithResolvers(
101101
(reason) =>
102102
logger().warn({
103103
msg: "unhandled envoy stopped promise rejection",
@@ -168,8 +168,8 @@ export class EngineActorDriver implements ActorDriver {
168168
rivetkit: { version: VERSION },
169169
},
170170
prepopulateActorNames: buildActorNames(config),
171-
onShutdown: (reason: utils.ShutdownReason) => {
172-
this.#envoyStopped.resolve(reason);
171+
onShutdown: () => {
172+
this.#envoyStopped.resolve();
173173
this.#isEnvoyStopped = true;
174174
},
175175
fetch: this.#envoyFetch.bind(this),
@@ -191,7 +191,7 @@ export class EngineActorDriver implements ActorDriver {
191191
this.#envoy = envoy;
192192

193193
envoy.started().then(() => {
194-
this.#envoyStarted.resolve(undefined);
194+
this.#envoyStarted.resolve();
195195
});
196196

197197
logger().debug({
@@ -514,18 +514,12 @@ export class EngineActorDriver implements ActorDriver {
514514
// NOTE: onAbort does not work reliably
515515
stream.onAbort(() => { });
516516
c.req.raw.signal.addEventListener("abort", () => {
517-
logger().debug("SSE aborted, shutting down runner");
518-
519-
// We cannot assume that the request will always be closed gracefully by Rivet. We always proceed with a graceful shutdown in case the request was terminated for any other reason.
520-
//
521-
// If we did not use a graceful shutdown, the runner would
522-
this.shutdown(false);
517+
logger().debug("SSE aborted");
523518
});
524519

525520
await this.#envoyStarted.promise;
526521

527-
// Runner id should be set if the runner started
528-
this.#envoy.startServerless(payload);
522+
this.#envoy.startServerlessActor(payload);
529523

530524
// Send ping every second to keep the connection alive
531525
while (true) {
@@ -548,12 +542,6 @@ export class EngineActorDriver implements ActorDriver {
548542
await stream.writeSSE({ event: "ping", data: "" });
549543
await stream.sleep(ENVOY_SSE_PING_INTERVAL);
550544
}
551-
552-
// Wait for the runner to stop if the SSE stream aborted early for any reason
553-
let reason = await this.#envoyStopped.promise;
554-
if (reason === "serverless-early-exit") {
555-
stream.writeSSE({ event: "stopping", data: "" });
556-
}
557545
});
558546
}
559547

@@ -783,7 +771,7 @@ export class EngineActorDriver implements ActorDriver {
783771
});
784772

785773
try {
786-
this.#envoy.stopActor(actorId, undefined);
774+
this.#envoy.stopActor(actorId, undefined);
787775
} catch (stopError) {
788776
logger().debug({
789777
msg: "failed to stop actor after start failure",

0 commit comments

Comments
 (0)