Skip to content

Commit bffdb50

Browse files
committed
fix(rivetkit): standardize startEngine + runtime mode based on config
1 parent 79767d3 commit bffdb50

5 files changed

Lines changed: 302 additions & 37 deletions

File tree

rivetkit-typescript/packages/rivetkit/src/registry/config/index.ts

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
import { z } from "zod";
22
import { getRunMetadata } from "@/actor/config";
33
import type {
4-
BaseActorDefinition,
54
AnyActorDefinition,
5+
BaseActorDefinition,
66
} from "@/actor/definition";
77
import {
88
KEYS,
9-
queueMetadataKey,
109
queueMessagesPrefix,
10+
queueMetadataKey,
1111
workflowStoragePrefix,
1212
} from "@/actor/keys";
1313
import { ENGINE_ENDPOINT } from "@/common/engine";
1414
import { type Logger, LogLevelSchema } from "@/common/log";
15-
import { DeepReadonly, VERSION } from "@/utils";
15+
import { VERSION } from "@/utils";
1616
import { tryParseEndpoint } from "@/utils/endpoint-parser";
1717
import {
18+
getNodeEnv,
1819
getRivetEndpoint,
1920
getRivetEngine,
2021
getRivetNamespace,
@@ -37,7 +38,9 @@ export type RegistryActors = z.infer<typeof ActorsSchema>;
3738
export const TestConfigSchema = z.object({ enabled: z.boolean() });
3839
export type TestConfig = z.infer<typeof TestConfigSchema>;
3940

40-
// TODO: Add sane defaults for NODE_ENV=development
41+
const RuntimeModeSchema = z.enum(["envoy", "serverless"]);
42+
export type RuntimeMode = z.infer<typeof RuntimeModeSchema>;
43+
4144
export const RegistryConfigSchema = z
4245
.object({
4346
// MARK: Actors
@@ -144,12 +147,18 @@ export const RegistryConfigSchema = z
144147
httpHost: z.string().optional(),
145148

146149
// MARK: Engine
150+
/**
151+
* @experimental
152+
*
153+
* Runtime mode to use when `registry.start()` is called.
154+
*/
155+
mode: RuntimeModeSchema.optional(),
147156
/**
148157
* @experimental
149158
*
150159
* Starts the full Rust engine process locally.
151160
*/
152-
startEngine: z.boolean().default(() => getRivetRunEngine()),
161+
startEngine: z.boolean().optional(),
153162
/** @experimental */
154163
engineVersion: z
155164
.string()
@@ -193,7 +202,12 @@ export const RegistryConfigSchema = z
193202
*
194203
* Must be >= rivetkit-core's drain timeout (20s) + margin.
195204
*/
196-
gracePeriodMs: z.number().int().min(1_000).optional().default(30_000),
205+
gracePeriodMs: z
206+
.number()
207+
.int()
208+
.min(1_000)
209+
.optional()
210+
.default(30_000),
197211
/**
198212
* If true, rivetkit will not install SIGINT/SIGTERM handlers.
199213
* Use when the host application owns signal policy and will
@@ -202,10 +216,19 @@ export const RegistryConfigSchema = z
202216
disableSignalHandlers: z.boolean().optional().default(false),
203217
})
204218
.optional()
205-
.default(() => ({ gracePeriodMs: 30_000, disableSignalHandlers: false })),
219+
.default(() => ({
220+
gracePeriodMs: 30_000,
221+
disableSignalHandlers: false,
222+
})),
206223
})
207224
.transform((config, ctx) => {
208225
const isDevEnv = isDev();
226+
const isProductionEnv = getNodeEnv() === "production";
227+
const envStartEngine = getRivetRunEngine();
228+
const explicitStartEngine =
229+
config.startEngine !== undefined || envStartEngine;
230+
let startEngine = true;
231+
let runtimeMode: RuntimeMode = "envoy";
209232

210233
// Parse endpoint string (env var fallback is applied via transform above)
211234
const parsedEndpoint = config.endpoint
@@ -217,16 +240,55 @@ export const RegistryConfigSchema = z
217240
})
218241
: undefined;
219242

220-
// Can't start a local engine and connect to a remote endpoint.
221-
if (config.startEngine && parsedEndpoint) {
243+
if (isProductionEnv) {
244+
startEngine = false;
245+
runtimeMode = "serverless";
246+
}
247+
248+
if (parsedEndpoint) {
249+
startEngine = false;
250+
runtimeMode = "serverless";
251+
}
252+
253+
if (config.mode === "envoy") {
254+
startEngine = false;
255+
runtimeMode = "envoy";
256+
} else if (config.mode === "serverless") {
257+
startEngine = false;
258+
runtimeMode = "serverless";
259+
}
260+
261+
if (explicitStartEngine) {
262+
startEngine = config.startEngine ?? envStartEngine;
263+
if (startEngine) {
264+
runtimeMode = "envoy";
265+
}
266+
}
267+
268+
if (explicitStartEngine && startEngine && parsedEndpoint) {
269+
ctx.addIssue({
270+
code: "custom",
271+
message:
272+
"cannot specify startEngine: true with a Rivet endpoint",
273+
});
274+
}
275+
276+
if (!startEngine && !parsedEndpoint) {
222277
ctx.addIssue({
223278
code: "custom",
224-
message: "cannot specify both startEngine and endpoint",
279+
message: "Rivet endpoint is required when startEngine is false",
280+
});
281+
}
282+
283+
if (runtimeMode === "serverless" && startEngine) {
284+
ctx.addIssue({
285+
code: "custom",
286+
message: "serverless runtime cannot start the local engine",
225287
});
226288
}
227289

228290
// configurePool requires an engine (via endpoint or startEngine).
229-
if (config.configurePool && !parsedEndpoint && !config.startEngine) {
291+
if (config.configurePool && !parsedEndpoint && !startEngine) {
230292
ctx.addIssue({
231293
code: "custom",
232294
message:
@@ -236,12 +298,11 @@ export const RegistryConfigSchema = z
236298

237299
// Flatten the endpoint and apply defaults for namespace/token
238300
// If startEngine is enabled, set endpoint to the engine endpoint.
239-
const endpoint = config.startEngine
301+
const endpoint = startEngine
240302
? ENGINE_ENDPOINT
241-
: (parsedEndpoint?.endpoint ??
242-
(isDevEnv ? ENGINE_ENDPOINT : undefined));
303+
: parsedEndpoint?.endpoint;
243304
const validateServerlessEndpoint = Boolean(
244-
config.startEngine || parsedEndpoint,
305+
startEngine || parsedEndpoint,
245306
);
246307
// Namespace priority: parsed from endpoint URL > config value (includes env var) > "default"
247308
const namespace =
@@ -272,7 +333,7 @@ export const RegistryConfigSchema = z
272333
// In dev mode, clients connect directly to the local Rivet Engine.
273334
const publicEndpoint =
274335
parsedPublicEndpoint?.endpoint ??
275-
(isDevEnv && config.startEngine ? ENGINE_ENDPOINT : undefined);
336+
(isDevEnv && startEngine ? ENGINE_ENDPOINT : undefined);
276337
// We extract publicNamespace to validate that it matches the backend
277338
// namespace (see validation above), not for functional use.
278339
const publicNamespace = parsedPublicEndpoint?.namespace;
@@ -282,6 +343,8 @@ export const RegistryConfigSchema = z
282343
// If endpoint is set or starting the engine, we'll use the engine driver.
283344
return {
284345
...config,
346+
startEngine,
347+
runtimeMode,
285348
endpoint,
286349
namespace,
287350
token,
@@ -516,11 +579,14 @@ export const DocRegistryConfigSchema = z
516579
.string()
517580
.optional()
518581
.describe("Host to bind the local HTTP server to."),
582+
mode: RuntimeModeSchema.optional().describe(
583+
"Runtime mode for registry.start(). Defaults to 'envoy' for local development and 'serverless' when a Rivet endpoint or production environment is configured.",
584+
),
519585
startEngine: z
520586
.boolean()
521587
.optional()
522588
.describe(
523-
"Starts the full Rust engine process locally. Default: false",
589+
"Starts the full Rust engine process locally. Defaults to true for local development and false when a Rivet endpoint or production environment is configured.",
524590
),
525591
engineVersion: z
526592
.string()

rivetkit-typescript/packages/rivetkit/src/registry/index.ts

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1+
import { ENGINE_ENDPOINT } from "@/common/engine";
2+
import { configureServerlessPool } from "@/serverless/configure";
3+
import { VERSION } from "@/utils";
14
import {
25
type RegistryActors,
36
type RegistryConfig,
47
type RegistryConfigInput,
58
RegistryConfigSchema,
69
} from "./config";
7-
import { ENGINE_ENDPOINT } from "@/common/engine";
810
import { logger } from "./log";
911
import { buildNativeRegistry } from "./native";
10-
import { configureServerlessPool } from "@/serverless/configure";
11-
import { VERSION } from "@/utils";
1212

1313
type ShutdownSignal = "SIGINT" | "SIGTERM";
1414

@@ -102,7 +102,9 @@ export class Registry<A extends RegistryActors> {
102102
}
103103

104104
let settled = false;
105-
let controllerRef: ReadableStreamDefaultController<Uint8Array> | undefined;
105+
let controllerRef:
106+
| ReadableStreamDefaultController<Uint8Array>
107+
| undefined;
106108
const backpressureWaiters: Array<() => void> = [];
107109
const resolveBackpressure = () => {
108110
while (
@@ -138,7 +140,7 @@ export class Registry<A extends RegistryActors> {
138140
headers[key] = value;
139141
});
140142

141-
let head;
143+
let head: { status: number; headers: Record<string, string> };
142144
try {
143145
head = await registry.handleServerlessRequest(
144146
{
@@ -241,6 +243,19 @@ export class Registry<A extends RegistryActors> {
241243
}
242244
}
243245

246+
/**
247+
* Prepares serverless deployments without starting a persistent envoy.
248+
*/
249+
#startServerless(config: RegistryConfig, printWelcome: boolean) {
250+
if (printWelcome) {
251+
this.#printWelcome(config, "serverless");
252+
}
253+
if (config.configurePool && !this.#configureServerlessPoolPromise) {
254+
this.#configureServerlessPoolPromise =
255+
configureServerlessPool(config);
256+
}
257+
}
258+
244259
#installSignalHandlers(
245260
config: RegistryConfig,
246261
nativeRegistryPromise: ReturnType<typeof buildNativeRegistry>,
@@ -319,10 +334,11 @@ export class Registry<A extends RegistryActors> {
319334
})(),
320335
];
321336
if (this.#nativeServerlessPromise) {
337+
const nativeServerlessPromise = this.#nativeServerlessPromise;
322338
registries.push(
323339
(async () => {
324340
try {
325-
const { registry } = await this.#nativeServerlessPromise!;
341+
const { registry } = await nativeServerlessPromise;
326342
await registry.shutdown();
327343
} catch (err) {
328344
logger().warn(
@@ -335,11 +351,12 @@ export class Registry<A extends RegistryActors> {
335351
}
336352
await Promise.all(registries);
337353

338-
if (this.#nativeServePromise) {
354+
const nativeServePromise = this.#nativeServePromise;
355+
if (nativeServePromise !== undefined) {
339356
// Swallow rejection so the race doesn't itself reject; the
340357
// always-attached `.catch` at the promise assignment site has
341358
// already logged any serve-side error.
342-
await this.#nativeServePromise.catch(() => undefined);
359+
await nativeServePromise.catch(() => undefined);
343360
}
344361
};
345362
await Promise.race([
@@ -353,10 +370,9 @@ export class Registry<A extends RegistryActors> {
353370
}
354371

355372
#removeSignalHandlers(): void {
356-
for (const [signal, handler] of Object.entries(this.#signalHandlers) as [
357-
ShutdownSignal,
358-
() => void,
359-
][]) {
373+
for (const [signal, handler] of Object.entries(
374+
this.#signalHandlers,
375+
) as [ShutdownSignal, () => void][]) {
360376
if (handler) process.removeListener(signal, handler);
361377
}
362378
this.#signalHandlers = {};
@@ -377,7 +393,11 @@ export class Registry<A extends RegistryActors> {
377393
*/
378394
public start() {
379395
const config = this.parseConfig();
380-
this.#startEnvoy(config, true);
396+
if (config.runtimeMode === "envoy") {
397+
this.#startEnvoy(config, true);
398+
} else {
399+
this.#startServerless(config, true);
400+
}
381401
}
382402

383403
#printWelcome(

rivetkit-typescript/packages/rivetkit/src/utils.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import { stringifyError } from "@/common/utils";
21
import type { Context as HonoContext, Handler as HonoHandler } from "hono";
3-
import { stringify as uuidstringify } from "uuid";
2+
import { stringifyError } from "@/common/utils";
43
import pkgJson from "../package.json" with { type: "json" };
54
import { getLogger } from "./common/log";
65
import { assertUnreachable } from "./common/utils";
@@ -143,10 +142,17 @@ export type GetUpgradeWebSocket = () => UpgradeWebSocket;
143142
* @experimental
144143
*/
145144
export function getEnvUniversal(key: string): string | undefined {
146-
if (typeof Deno !== "undefined") {
147-
return Deno.env.get(key);
145+
const global = globalThis as typeof globalThis & {
146+
Deno?: { env?: { get?: (key: string) => string | undefined } };
147+
Bun?: { env?: Record<string, string | undefined> };
148+
};
149+
150+
if (typeof global.Deno?.env?.get === "function") {
151+
return global.Deno.env.get(key);
152+
} else if (global.Bun?.env?.[key] !== undefined) {
153+
return global.Bun.env[key];
148154
} else if (typeof process !== "undefined") {
149-
// Do this after Deno since `process` is sometimes polyfilled
155+
// Do this after Deno and Bun since `process` is sometimes polyfilled.
150156
return process.env[key];
151157
}
152158
}

rivetkit-typescript/packages/rivetkit/tests/native-runtime-errors.test.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1+
import { describe, expect, test } from "vitest";
12
import { actor, setup } from "@/mod";
23
import { RivetError } from "../src/actor/errors";
34
import {
4-
NativeActorContextAdapter,
55
buildNativeRegistry,
6+
NativeActorContextAdapter,
67
} from "../src/registry/native";
7-
import { describe, expect, test } from "vitest";
88

99
const testActor = actor({
1010
state: {},
@@ -83,6 +83,7 @@ describe("native runtime config errors", () => {
8383
test: testActor,
8484
},
8585
startEngine: false,
86+
endpoint: "http://127.0.0.1:6642",
8687
});
8788
const config = registry.parseConfig();
8889
config.endpoint = undefined as never;

0 commit comments

Comments
 (0)