diff --git a/README.md b/README.md index 7e9b2c5..e2e14c3 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,30 @@ fastify.listen({ port: 3000 }, err => { In this case, it will respond with a 404 error on every unregistered route, closing the incoming upgrade connection requests. +### HTTP/2 Support + +This plugin supports WebSocket connections over HTTP/2 using the [RFC 8441](https://datatracker.ietf.org/doc/html/rfc8441) Extended CONNECT Protocol. When using Fastify with `http2: true`, WebSocket connections are automatically handled over HTTP/2 streams. + +```js +'use strict' + +const fastify = require('fastify')({ http2: true }) +fastify.register(require('@fastify/websocket')) +fastify.register(async function (fastify) { + fastify.get('/', { websocket: true }, (socket, req) => { + socket.on('message', message => { + socket.send('hi from HTTP/2 server') + }) + }) +}) + +fastify.listen({ port: 3000 }) +``` + +HTTP/2 WebSocket connections use the CONNECT method with the `:protocol` pseudo-header set to `websocket`. The plugin automatically enables the `enableConnectProtocol` setting on HTTP/2 servers. + +**Note:** HTTP/2 WebSocket support requires Node.js with HTTP/2 support and clients that implement RFC 8441. + However, you can still define a wildcard route, that will be used as the default handler: ```js diff --git a/index.js b/index.js index 46f5a48..cb0acd6 100644 --- a/index.js +++ b/index.js @@ -9,6 +9,8 @@ const Duplexify = require('duplexify') const kWs = Symbol('ws-socket') const kWsHead = Symbol('ws-head') +const kWsHttp2 = Symbol('ws-http2') +const kWsHttp2Handled = Symbol('ws-http2-handled') const statusCodeReg = /HTTP\/1.1 (\d+)/u function fastifyWebsocket (fastify, opts, next) { @@ -51,6 +53,16 @@ function fastifyWebsocket (fastify, opts, next) { const wss = new WebSocket.Server(wssOptions) fastify.decorate('websocketServer', wss) + // Check if this is an HTTP/2 server by checking for http2 module's server types + const isHttp2Server = websocketListenServer.constructor.name === 'Http2Server' || + websocketListenServer.constructor.name === 'Http2SecureServer' + + // For HTTP/2 servers, enable the extended CONNECT protocol (RFC 8441) + // This allows WebSocket connections over HTTP/2 + if (isHttp2Server && typeof websocketListenServer.updateSettings === 'function') { + websocketListenServer.updateSettings({ enableConnectProtocol: true }) + } + // TODO: place upgrade context as options async function injectWS (path = '/', upgradeContext = {}, options = {}) { const server2Client = new PassThrough() @@ -109,6 +121,53 @@ function fastifyWebsocket (fastify, opts, next) { fastify.decorate('injectWS', injectWS) + // HTTP/2 WebSocket handler (RFC 8441 - Extended CONNECT Protocol) + // For HTTP/2, WebSocket connections use CONNECT method with :protocol pseudo-header + function onHttp2Stream (stream, headers) { + // Mark this stream as handled by websocket plugin + // This prevents Fastify's HTTP/2 compatibility layer from processing it + stream[kWsHttp2Handled] = true + + // Get the path from :path pseudo-header + /* c8 ignore next */ + const path = headers[':path'] || '/' + + // Create a minimal request object that mimics an HTTP/1.1 request for routing + // The HTTP/2 stream will be stored in kWs and used as the WebSocket transport + const rawRequest = { + method: 'GET', // WebSocket routes are registered as GET + url: path, + headers: { + ...headers, + // Add headers that WebSocket routes might check + connection: 'upgrade', + upgrade: 'websocket' + }, + httpVersion: '2.0', + socket: stream.session.socket, + [kWs]: stream, + [kWsHead]: Buffer.alloc(0), + [kWsHttp2]: true + } + + // Create a mock response object for Fastify routing + // We use a PassThrough stream as a sink instead of the real HTTP/2 stream + // This prevents ServerResponse from interfering with the HTTP/2 stream + // The actual response (status 200) is sent in handleHttp2WebSocket + const mockSocket = new PassThrough() + mockSocket.remoteAddress = stream.session?.socket?.remoteAddress + const rawResponse = new ServerResponse(rawRequest) + rawResponse.assignSocket(mockSocket) + + try { + fastify.routing(rawRequest, rawResponse) + /* c8 ignore next 4 */ + } catch (err) { + fastify.log.warn({ err }, 'http2 websocket connection failed') + stream.close() + } + } + function onUpgrade (rawRequest, socket, head) { // Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket. rawRequest[kWs] = socket @@ -117,10 +176,41 @@ function fastifyWebsocket (fastify, opts, next) { try { rawResponse.assignSocket(socket) fastify.routing(rawRequest, rawResponse) + /* c8 ignore next 3 */ } catch (err) { fastify.log.warn({ err }, 'websocket upgrade failed') } } + + // For HTTP/1.1 servers, listen to upgrade event + // For HTTP/2 servers, we need to intercept the stream event and prevent Fastify's + // internal handler from processing WebSocket CONNECT requests + if (isHttp2Server) { + // Get Fastify's original stream handler + const listeners = websocketListenServer.listeners('stream') + const fastifyHandler = listeners.find(l => l.name === 'bound onServerStream') + + if (fastifyHandler) { + // Remove Fastify's handler + websocketListenServer.removeListener('stream', fastifyHandler) + + // Add a wrapper that handles WebSocket CONNECT or delegates to Fastify + websocketListenServer.on('stream', function onStreamWrapper (stream, headers) { + // Check if this is a WebSocket CONNECT request + if (headers[':method'] === 'CONNECT' && headers[':protocol'] === 'websocket') { + // Handle as WebSocket - don't let Fastify process this stream + onHttp2Stream(stream, headers) + } else { + // Let Fastify handle regular HTTP/2 requests + fastifyHandler(stream, headers) + } + }) + /* c8 ignore next 4 */ + } else { + // Fallback: if we can't find Fastify's handler, just prepend ours + websocketListenServer.prependListener('stream', onHttp2Stream) + } + } websocketListenServer.on('upgrade', onUpgrade) const handleUpgrade = (rawRequest, callback) => { @@ -135,6 +225,51 @@ function fastifyWebsocket (fastify, opts, next) { }) } + // Handle HTTP/2 WebSocket connections (RFC 8441) + // For HTTP/2, we respond with status 200 and then create WebSocket over the stream + const handleHttp2WebSocket = (rawRequest, callback) => { + const stream = rawRequest[kWs] + const head = rawRequest[kWsHead] + + // Respond to the CONNECT request - this establishes the WebSocket tunnel + // For HTTP/2, we respond with status 200 (not 101 like HTTP/1.1) + stream.respond({ ':status': 200 }) + + // Create a WebSocket instance and set the stream as its socket + const socket = new WebSocket(null, undefined, {}) + + // IMPORTANT: _isServer must be set explicitly before setSocket + // This ensures proper WebSocket frame masking (server expects masked frames from clients) + socket._isServer = true + + // HTTP/2 streams don't have setNoDelay, so we add a no-op + if (!stream.setNoDelay) { + stream.setNoDelay = () => {} + } + + // Set the socket using the HTTP/2 stream as the transport + // maxPayload from wssOptions or use a large default + const maxPayload = wssOptions.maxPayload || 104857600 // 100MB default + socket.setSocket(stream, head, { maxPayload }) + + // Track the client in the WebSocket server if client tracking is enabled + if (wss.options.clientTracking) { + wss.clients.add(socket) + socket.on('close', () => { + wss.clients.delete(socket) + }) + } + + wss.emit('connection', socket, rawRequest) + + /* c8 ignore next 3 */ + socket.on('error', (error) => { + fastify.log.error(error) + }) + + callback(socket) + } + fastify.addHook('onRequest', (request, _reply, done) => { // this adds req.ws to the Request object if (request.raw[kWs]) { request.ws = true @@ -146,7 +281,14 @@ function fastifyWebsocket (fastify, opts, next) { fastify.addHook('onResponse', (request, _reply, done) => { if (request.ws) { - request.raw[kWs].destroy() + const stream = request.raw[kWs] + // For HTTP/2 streams, use close() if destroy() is not available + if (typeof stream.destroy === 'function') { + stream.destroy() + /* c8 ignore next 3 */ + } else if (typeof stream.close === 'function') { + stream.close() + } } done() }) @@ -188,7 +330,13 @@ function fastifyWebsocket (fastify, opts, next) { // within the route handler, we check if there has been a connection upgrade by looking at request.raw[kWs]. we need to dispatch the normal HTTP handler if not, and hijack to dispatch the websocket handler if so if (request.raw[kWs]) { reply.hijack() - handleUpgrade(request.raw, socket => { + + // Use appropriate handler based on HTTP version + // HTTP/2 WebSocket (RFC 8441) uses handleHttp2WebSocket + // HTTP/1.1 WebSocket uses handleUpgrade + const upgradeHandler = request.raw[kWsHttp2] ? handleHttp2WebSocket : handleUpgrade + + upgradeHandler(request.raw, socket => { let result try { if (isWebsocketRoute) { @@ -221,6 +369,15 @@ function fastifyWebsocket (fastify, opts, next) { } fastify.server.removeListener('upgrade', onUpgrade) + if (isHttp2Server) { + // Remove our stream wrapper/handler + const listeners = fastify.server.listeners('stream') + /* c8 ignore next */ + const ourHandler = listeners.find(l => l.name === 'onStreamWrapper' || l.name === 'onHttp2Stream') + if (ourHandler) { + fastify.server.removeListener('stream', ourHandler) + } + } server.close(done) diff --git a/test/http2.test.js b/test/http2.test.js new file mode 100644 index 0000000..3366f9d --- /dev/null +++ b/test/http2.test.js @@ -0,0 +1,318 @@ +'use strict' + +const { test } = require('node:test') +const http2 = require('node:http2') +const Fastify = require('fastify') +const fastifyWebsocket = require('..') +const WebSocket = require('ws') +const { once } = require('node:events') + +// Helper to create WebSocket client over HTTP/2 stream +function createWsClient (stream) { + const head = Buffer.alloc(0) + const ws = new WebSocket(null, undefined, {}) + // IMPORTANT: _isServer must be set explicitly before setSocket for proper frame masking + ws._isServer = false + stream.setNoDelay = () => {} + return { ws, head } +} + +// Helper to setup WebSocket on HTTP/2 stream and handle the immediate OPEN state +// Note: With HTTP/2, setSocket makes the WebSocket immediately OPEN without emitting 'open' event +function setupWsOnStream (ws, stream, head, onReady) { + ws.setSocket(stream, head, { maxPayload: 104857600 }) + // WebSocket is immediately OPEN after setSocket with HTTP/2 + // Use setImmediate to allow event handlers to be attached first + setImmediate(() => { + if (ws.readyState === WebSocket.OPEN) { + onReady() + } + }) +} + +test('Should expose a websocket over HTTP/2 (RFC 8441)', async (t) => { + t.plan(2) + + const fastify = Fastify({ http2: true }) + t.after(() => fastify.close()) + + await fastify.register(fastifyWebsocket) + + fastify.get('/', { websocket: true }, (socket) => { + t.after(() => socket.terminate()) + + socket.once('message', (chunk) => { + t.assert.deepStrictEqual(chunk.toString(), 'hello server') + socket.send('hello client') + }) + }) + + await fastify.listen({ port: 0 }) + const port = fastify.server.address().port + + // Create HTTP/2 client + const client = http2.connect(`http://localhost:${port}`) + t.after(() => client.close()) + + // Wait for remote settings to be received + const [settings] = await once(client, 'remoteSettings') + + // Check if the server supports extended connect protocol + if (!settings.enableConnectProtocol) { + t.skip('Server does not support extended connect protocol') + return + } + + // Send CONNECT request with :protocol websocket (RFC 8441) + const stream = client.request({ + ':method': 'CONNECT', + ':protocol': 'websocket', + ':path': '/' + }) + + const { ws, head } = createWsClient(stream) + + let resolve + const messagePromise = new Promise((r) => { resolve = r }) + + ws.on('message', (data) => { + resolve(data.toString()) + }) + + ws.on('error', (err) => { + t.fail('WebSocket error: ' + err.message) + }) + + // Wait for response + stream.on('response', (headers) => { + const status = headers[':status'] + if (status === 200) { + setupWsOnStream(ws, stream, head, () => { + ws.send('hello server') + }) + } else { + t.fail('Unexpected status: ' + status) + } + }) + + const message = await messagePromise + t.assert.deepStrictEqual(message, 'hello client') + + ws.close() +}) + +test('Should handle multiple HTTP/2 WebSocket connections', async (t) => { + t.plan(4) + + const fastify = Fastify({ http2: true }) + t.after(() => fastify.close()) + + await fastify.register(fastifyWebsocket) + + fastify.get('/echo', { websocket: true }, (socket) => { + socket.on('message', (data) => { + socket.send('echo: ' + data.toString()) + }) + }) + + await fastify.listen({ port: 0 }) + const port = fastify.server.address().port + + // Create HTTP/2 client (single connection, multiple streams) + const client = http2.connect(`http://localhost:${port}`) + t.after(() => client.close()) + + // Wait for remote settings to be received + const [settings] = await once(client, 'remoteSettings') + + if (!settings.enableConnectProtocol) { + t.skip('Server does not support extended connect protocol') + return + } + + // Helper to create WebSocket over HTTP/2 + const createWsOverHttp2 = async (path, message) => { + const stream = client.request({ + ':method': 'CONNECT', + ':protocol': 'websocket', + ':path': path + }) + + const { ws, head } = createWsClient(stream) + + return new Promise((resolve, reject) => { + ws.on('message', (data) => { + ws.close() + resolve(data.toString()) + }) + + ws.on('error', reject) + + stream.on('response', (headers) => { + if (headers[':status'] === 200) { + setupWsOnStream(ws, stream, head, () => { + ws.send(message) + }) + } else { + reject(new Error('Unexpected status: ' + headers[':status'])) + } + }) + }) + } + + // Create two WebSocket connections over the same HTTP/2 connection + const [result1, result2] = await Promise.all([ + createWsOverHttp2('/echo', 'message1'), + createWsOverHttp2('/echo', 'message2') + ]) + + t.assert.deepStrictEqual(result1, 'echo: message1') + t.assert.deepStrictEqual(result2, 'echo: message2') + + // Create two more to verify continued functionality + const [result3, result4] = await Promise.all([ + createWsOverHttp2('/echo', 'message3'), + createWsOverHttp2('/echo', 'message4') + ]) + + t.assert.deepStrictEqual(result3, 'echo: message3') + t.assert.deepStrictEqual(result4, 'echo: message4') +}) + +test('Should handle regular HTTP/2 requests alongside WebSocket', async (t) => { + t.plan(2) + + const fastify = Fastify({ http2: true }) + t.after(() => fastify.close()) + + await fastify.register(fastifyWebsocket) + + // Regular HTTP route + fastify.get('/api', (request, reply) => { + reply.send({ message: 'hello' }) + }) + + // WebSocket route + fastify.get('/ws', { websocket: true }, (socket) => { + socket.on('message', (data) => { + socket.send('echo: ' + data.toString()) + }) + }) + + await fastify.listen({ port: 0 }) + const port = fastify.server.address().port + + const client = http2.connect(`http://localhost:${port}`) + t.after(() => client.close()) + + // Wait for settings first + const [settings] = await once(client, 'remoteSettings') + + // Test regular HTTP/2 request + const httpResult = await new Promise((resolve, reject) => { + const req = client.request({ ':path': '/api' }) + let data = '' + req.on('data', (chunk) => { data += chunk }) + req.on('end', () => resolve(JSON.parse(data))) + req.on('error', reject) + req.end() + }) + + t.assert.deepStrictEqual(httpResult.message, 'hello') + + // Now test WebSocket over same connection + if (!settings.enableConnectProtocol) { + t.skip('Server does not support extended connect protocol') + return + } + + const wsResult = await new Promise((resolve, reject) => { + const stream = client.request({ + ':method': 'CONNECT', + ':protocol': 'websocket', + ':path': '/ws' + }) + + const { ws, head } = createWsClient(stream) + + ws.on('message', (data) => { + ws.close() + resolve(data.toString()) + }) + + ws.on('error', reject) + + stream.on('response', (headers) => { + if (headers[':status'] === 200) { + setupWsOnStream(ws, stream, head, () => { + ws.send('test') + }) + } else { + reject(new Error('Unexpected status: ' + headers[':status'])) + } + }) + }) + + t.assert.deepStrictEqual(wsResult, 'echo: test') +}) + +test('Should track HTTP/2 WebSocket clients when clientTracking is enabled', async (t) => { + t.plan(2) + + const fastify = Fastify({ http2: true }) + t.after(() => fastify.close()) + + await fastify.register(fastifyWebsocket, { + options: { clientTracking: true } + }) + + fastify.get('/', { websocket: true }, (socket) => { + // Connection established + }) + + await fastify.listen({ port: 0 }) + const port = fastify.server.address().port + + const client = http2.connect(`http://localhost:${port}`) + t.after(() => client.close()) + + // Wait for remote settings to be received + const [settings] = await once(client, 'remoteSettings') + + if (!settings.enableConnectProtocol) { + t.skip('Server does not support extended connect protocol') + return + } + + const stream = client.request({ + ':method': 'CONNECT', + ':protocol': 'websocket', + ':path': '/' + }) + + const { ws, head } = createWsClient(stream) + + const openPromise = new Promise((resolve) => { + stream.on('response', (headers) => { + if (headers[':status'] === 200) { + setupWsOnStream(ws, stream, head, () => { + resolve() + }) + } + }) + }) + + await openPromise + + // Check that the client is tracked + t.assert.deepStrictEqual(fastify.websocketServer.clients.size, 1) + + // Close the websocket + ws.close() + + // Wait a bit for the close to propagate + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Client should be removed from tracking + t.assert.deepStrictEqual(fastify.websocketServer.clients.size, 0) +})