diff --git a/package.json b/package.json index 9c64e824c3..f00caee55f 100644 --- a/package.json +++ b/package.json @@ -71,7 +71,7 @@ "consola": "^3.4.2", "crossws": "^0.4.6", "db0": "^0.3.4", - "env-runner": "^0.1.14", + "env-runner": "^0.1.15", "h3": "^2.0.1-rc.22", "hookable": "^6.1.1", "nf3": "^0.3.18", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c8d98a39b3..9a8a820bc3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -30,8 +30,8 @@ importers: specifier: '*' version: 17.4.2 env-runner: - specifier: ^0.1.14 - version: 0.1.14(@vercel/queue@0.3.1)(miniflare@4.20260625.0)(wrangler@4.105.0) + specifier: ^0.1.15 + version: 0.1.15(@vercel/queue@0.3.1)(miniflare@4.20260625.0)(wrangler@4.105.0) h3: specifier: ^2.0.1-rc.22 version: 2.0.1-rc.22(crossws@0.4.6) @@ -3813,12 +3813,12 @@ packages: resolution: {integrity: sha512-TWrgLOFUQTH994YUyl1yT4uyavY5nNB5muff+RtWaqNVCAK408b5ZnnbNAUEWLTCpum9w6arT70i1XdQ4UeOPA==} engines: {node: '>=0.12'} - env-runner@0.1.14: - resolution: {integrity: sha512-qdk5mmgFsd+zPg3r1bkZ+IbvpfUfypyDvNhMGypSMRpz7kOa/kI6SpW8fgyukuEM4Lo24M65r+1Ne0DtT7vFBA==} + env-runner@0.1.15: + resolution: {integrity: sha512-Xiu7VUW1jlwEiZSGWJ5Wuv9D4iSC2JwIB3Bb/RI+1DBSCeCrCpdAwRvsi3e52wETPUBkV61b0Ixfi593hQmQ6w==} hasBin: true peerDependencies: '@netlify/runtime': ^4.1.23 - '@vercel/queue': ^0.2.0 + '@vercel/queue': '>=0.2.0' miniflare: ^4.20260515.0 wrangler: ^4.0.0 peerDependenciesMeta: @@ -9787,7 +9787,7 @@ snapshots: entities@7.0.1: {} - env-runner@0.1.14(@vercel/queue@0.3.1)(miniflare@4.20260625.0)(wrangler@4.105.0): + env-runner@0.1.15(@vercel/queue@0.3.1)(miniflare@4.20260625.0)(wrangler@4.105.0): dependencies: crossws: 0.4.6(srvx@0.11.18) exsolve: 1.1.0 diff --git a/src/dev/server.ts b/src/dev/server.ts index a6b5e33d87..a3853b43cb 100644 --- a/src/dev/server.ts +++ b/src/dev/server.ts @@ -2,17 +2,23 @@ import type { IncomingMessage } from "node:http"; import type { Socket } from "node:net"; import type { FSWatcher } from "chokidar"; import type { ServerOptions, Server } from "srvx"; -import type { EnvRunnerData, RunnerMessageListener, RunnerRPCHooks } from "env-runner"; +import type { + EnvRunnerData, + RunnerMessageListener, + RunnerRPCHooks, + WorkerAddress, +} from "env-runner"; import type { RunnerName } from "env-runner"; import { RunnerManager, loadRunner } from "env-runner"; import type { Nitro } from "nitro/types"; import { HTTPError } from "h3"; +import { createWebSocketProxy } from "crossws"; import consola from "consola"; import { resolve } from "pathe"; import { watch } from "chokidar"; -import { serve } from "srvx/node"; +import { serve } from "srvx"; import { debounce } from "perfect-debounce"; import { isTest, isCI } from "std-env"; import { NitroDevApp } from "./app.ts"; @@ -31,6 +37,7 @@ export class NitroDevServer extends NitroDevApp implements RunnerRPCHooks { #workerIdCtr: number = 0; #workerError?: unknown; #workerRetries: number = 0; + #workerAddr?: WorkerAddress; #building?: boolean = true; // Assume initial build will start soon #buildError?: unknown; #reloadPromise?: Promise; @@ -69,6 +76,7 @@ export class NitroDevServer extends NitroDevApp implements RunnerRPCHooks { this.#manager = new RunnerManager(); this.#manager.onReady(async (_runner, addr) => { this.#workerRetries = 0; + this.#workerAddr = addr; writeDevBuildInfo(this.nitro, addr).catch((error) => { this.nitro.logger.warn( `Failed to write dev build info: ${error instanceof Error ? error.message : String(error)}` @@ -129,19 +137,42 @@ export class NitroDevServer extends NitroDevApp implements RunnerRPCHooks { statusText: "Worker does not support upgrades.", }); } + // Upgrades can arrive while the worker is (re)building; the runner drops + // them when it isn't ready yet, so wait for it before proxying. + for (let i = 0; i < 200 && !this.#manager.ready; i++) { + await new Promise((r) => setTimeout(r, 50)); + } return this.#manager.upgrade({ node: { req, socket, head } }); } - listen(opts?: Partial>): Server { + async listen(opts?: Partial>): Promise { + const websocket = + this.nitro.options.features.websocket ?? this.nitro.options.experimental.websocket; + + const plugins = [...(opts?.plugins ?? [])]; + + // Bun/Deno serve natively and expose no Node.js upgrade socket, so the raw + // `http.Server` `"upgrade"` proxy can't work there (Bun also drops manual + // upgrade writes and never surfaces the `101` on its `node:http` client). + // Bridge the WebSocket to the worker with `crossws` + a `WebSocket` client. + // On Node the native upgrade event proxies the raw socket directly (below). + const nativeRuntime = "Bun" in globalThis || "Deno" in globalThis; + if (websocket && nativeRuntime) { + plugins.push(await createWebSocketProxyPlugin(() => this.#workerAddr)); + } + const server = serve({ ...opts, fetch: this.fetch, + plugins, gracefulShutdown: false, }); this.#listeners.push(server); - if (server.node?.server) { + + if (websocket && !nativeRuntime && server.node?.server) { server.node.server.on("upgrade", (req, sock, head) => this.upgrade(req, sock, head)); } + return server; } @@ -248,3 +279,60 @@ export class NitroDevServer extends NitroDevApp implements RunnerRPCHooks { // #endregion } + +type CrosswsPlugin = Awaited["plugin"]; +type SrvxPlugin = ReturnType; + +/** + * WebSocket reverse-proxy plugin bridging the dev server to the worker, used on + * Bun/Deno. Those runtimes serve natively (no Node.js upgrade socket to proxy), + * so the client WebSocket is terminated with `crossws` and proxied to the dev + * worker over a standard `WebSocket` client. + */ +async function createWebSocketProxyPlugin( + getAddress: () => WorkerAddress | undefined +): Promise { + const { plugin } = + "Bun" in globalThis ? await import("crossws/server/bun") : await import("crossws/server/deno"); + + const proxy = createWebSocketProxy({ + target: (peer) => { + const addr = getAddress(); + if (!addr?.port) { + throw new Error("Dev worker is not ready"); + } + const { pathname, search } = new URL(peer.request.url); + return `ws://${addr.host || "127.0.0.1"}:${addr.port}${pathname}${search}`; + }, + // Resolve the forwarded subprotocol defensively: on Deno the request is no + // longer readable inside the `open` hook (after `Deno.upgradeWebSocket()`). + forwardProtocol: (peer) => { + try { + const header = peer.request.headers.get("sec-websocket-protocol"); + return header + ? header + .split(",") + .map((p) => p.trim()) + .filter(Boolean) + : undefined; + } catch { + return undefined; + } + }, + }); + + // The upgrade can arrive before the worker has reported its address (e.g. + // right after a reload). The `upgrade` hook is awaited by every srvx adapter, + // so wait here for the worker to become ready before proxying. + const hooks = { + ...proxy, + async upgrade(request: Request) { + for (let i = 0; i < 200 && !getAddress()?.port; i++) { + await new Promise((r) => setTimeout(r, 50)); + } + return proxy.upgrade?.(request); + }, + }; + + return plugin({ resolve: () => hooks }); +} diff --git a/src/presets/_nitro/runtime/nitro-dev.ts b/src/presets/_nitro/runtime/nitro-dev.ts index 2068abd841..a69a70f81d 100644 --- a/src/presets/_nitro/runtime/nitro-dev.ts +++ b/src/presets/_nitro/runtime/nitro-dev.ts @@ -18,19 +18,13 @@ if (import.meta._tasks) { startScheduleRunner({}); } -const ws = import.meta._websocket - ? await import("crossws/adapters/node").then((m) => - (m.default || m)({ resolve: resolveWebsocketHooks }) - ) - : undefined; - export default { fetch: nitroApp.fetch, plugins: [...tracingSrvxPlugins], - upgrade: ws - ? (context: { node: { req: any; socket: any; head: any } }) => { - ws.handleUpgrade(context.node.req, context.node.socket, context.node.head); - } + // Let the dev runner attach the runtime-appropriate crossws adapter + // (node/bun/deno) via `crossws/server` instead of hardcoding the Node.js one. + websocket: import.meta._websocket + ? ({ resolve: resolveWebsocketHooks } as AppEntry["websocket"]) : undefined, ipc: { onClose: () => nitroHooks.callHook("close"), diff --git a/src/runtime/internal/app.ts b/src/runtime/internal/app.ts index 2625046dc8..9fbbff5aea 100644 --- a/src/runtime/internal/app.ts +++ b/src/runtime/internal/app.ts @@ -52,10 +52,21 @@ export function serverFetch( } } -export async function resolveWebsocketHooks(req: ServerRequest): Promise> { - // https://github.com/h3js/h3/blob/c11ca743d476e583b3b47de1717e6aae92114357/src/utils/ws.ts#L37 - const hooks = ((await serverFetch(req)) as any).crossws as Partial; - return hooks || {}; +// crossws resolves hooks on every lifecycle event (upgrade, open, message, +// close) by re-running the app handler. Cache the result per request: it avoids +// re-running the whole pipeline on each message and, more importantly, the only +// readable point is the initial `upgrade` (e.g. on Deno the request is no longer +// readable after `Deno.upgradeWebSocket()`). +// https://github.com/h3js/h3/blob/c11ca743d476e583b3b47de1717e6aae92114357/src/utils/ws.ts#L37 +const websocketHooksCache = new WeakMap>>(); + +export function resolveWebsocketHooks(req: ServerRequest): Promise> { + let hooks = websocketHooksCache.get(req); + if (!hooks) { + hooks = serverFetch(req).then((res) => ((res as any).crossws as Partial) || {}); + websocketHooksCache.set(req, hooks); + } + return hooks; } export function fetch( diff --git a/test/fixture/nitro.config.ts b/test/fixture/nitro.config.ts index 6680c45f3c..b99ae91b56 100644 --- a/test/fixture/nitro.config.ts +++ b/test/fixture/nitro.config.ts @@ -24,6 +24,7 @@ export default defineConfig({ // @ts-expect-error __vitePkg__: process.env.NITRO_VITE_PKG, framework: { name: "nitro", version: "3.x" }, + features: { websocket: true }, imports: { presets: [ { diff --git a/test/fixture/server/routes/ws.ts b/test/fixture/server/routes/ws.ts new file mode 100644 index 0000000000..f2a4ab4847 --- /dev/null +++ b/test/fixture/server/routes/ws.ts @@ -0,0 +1,14 @@ +import { defineWebSocketHandler } from "h3"; + +export default defineWebSocketHandler({ + open(peer) { + peer.send("connected"); + }, + message(peer, message) { + if (message.text() === "ping") { + peer.send("pong"); + } else { + peer.send(`echo:${message.text()}`); + } + }, +}); diff --git a/test/presets/nitro-dev.test.ts b/test/presets/nitro-dev.test.ts index b9aa335836..1ce7613494 100644 --- a/test/presets/nitro-dev.test.ts +++ b/test/presets/nitro-dev.test.ts @@ -22,6 +22,28 @@ describe("nitro:preset:nitro-dev", async () => { expect(status).toBe(200); }); + describe("websocket", () => { + it("upgrades and echoes messages in dev mode", async () => { + const wsURL = ctx.server!.url.replace(/^http/, "ws") + "ws"; + const ws = new WebSocket(wsURL); + const messages: string[] = []; + await new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error("websocket timeout")), 5000); + ws.addEventListener("error", (error) => reject(error as any)); + ws.addEventListener("message", (event) => { + messages.push(String(event.data)); + if (messages.length === 2) { + clearTimeout(timer); + ws.close(); + resolve(); + } + }); + ws.addEventListener("open", () => ws.send("ping")); + }); + expect(messages).toEqual(["connected", "pong"]); + }); + }); + describe("tasks", () => { it("GET /_nitro/tasks lists tasks", async () => { const { data, status } = await callHandler({ url: "/_nitro/tasks" }); diff --git a/test/presets/vercel.test.ts b/test/presets/vercel.test.ts index bf9d1e121a..8fa8ec8862 100644 --- a/test/presets/vercel.test.ts +++ b/test/presets/vercel.test.ts @@ -221,6 +221,10 @@ describe("nitro:preset:vercel:web", async () => { "dest": "/_vercel/queues/consumer", "src": "/_vercel/queues/consumer", }, + { + "dest": "/ws", + "src": "/ws", + }, { "dest": "/wasm/static-import", "src": "/wasm/static-import", @@ -538,6 +542,7 @@ describe("nitro:preset:vercel:web", async () => { "functions/wait-until.func (symlink)", "functions/wasm/dynamic-import.func (symlink)", "functions/wasm/static-import.func (symlink)", + "functions/ws.func (symlink)", ] `); });